import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
*/
protected int fetchTimeout;
+ /**
+ * Time to sleep on a fetch failure.
+ */
+ @Getter
+ private final int sleepTime;
+
/**
* Counted down when {@link #close()} is invoked.
*/
- private CountDownLatch closeCondition = new CountDownLatch(1);
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
/**
*/
protected FetchingBusConsumer(BusTopicParams busTopicParams) {
this.fetchTimeout = busTopicParams.getFetchTimeout();
+
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+ }
}
/**
*/
protected void sleepAfterFetchFailure() {
try {
- if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
logger.info("{}: closed while handling fetch error", this);
}
try {
return this.consumer.fetch();
} catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
+ logger.error("{}: cannot fetch because of {}", this, e.getMessage());
sleepAfterFetchFailure();
throw e;
}
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.powermock.reflect.Whitebox;
public class BusConsumerTest extends TopicTestBase {
super.setUp();
}
+ @Test
+ public void testFetchingBusConsumer() throws InterruptedException {
+ // should not be negative
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be zero
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be too large
+ cons = new FetchingBusConsumerImpl(
+ makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+ // should not be what was specified
+ cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
+ assertThat(cons.getSleepTime()).isEqualTo(100);
+ }
+
@Test
public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
- var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
+ var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
private CountDownLatch started = new CountDownLatch(1);
started.countDown();
super.sleepAfterFetchFailure();
}
-
- @Override
- public Iterable<String> fetch() throws IOException {
- return null;
- }
};
// full sleep
public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
}
+
+ private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
+
+ protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
+ super(busTopicParams);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException {
+ return null;
+ }
+ }
}