Wait after fetch exception on topic 03/122403/2
authorJim Hahn <jrh3@att.com>
Thu, 1 Jul 2021 15:41:00 +0000 (11:41 -0400)
committerJim Hahn <jrh3@att.com>
Fri, 2 Jul 2021 12:53:40 +0000 (08:53 -0400)
When dmaap is inaccessible for some reason, the topic source frequently
enters a fast fail loop, rapidly filling up the log.  Modified the code
to wait the configured fetchTimeout when this occurs.

With any luck, this will also fix the sporadic kubernetes crash-fail
loops sometimes seen with the xacml-pdp pod.

Modified to limit how long it will sleep after a failure, regardless of
the fetchTimeout that was specified.

Issue-ID: POLICY-3457
Change-Id: I88e360fb1d31197b46f4959e5ea0ea2d741ad25c
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java

index 1e2c82b..20f4c91 100644 (file)
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@@ -72,10 +73,16 @@ public interface BusConsumer {
          */
         protected int fetchTimeout;
 
+        /**
+         * Time to sleep on a fetch failure.
+         */
+        @Getter
+        private final int sleepTime;
+
         /**
          * Counted down when {@link #close()} is invoked.
          */
-        private CountDownLatch closeCondition = new CountDownLatch(1);
+        private final CountDownLatch closeCondition = new CountDownLatch(1);
 
 
         /**
@@ -85,6 +92,13 @@ public interface BusConsumer {
          */
         protected FetchingBusConsumer(BusTopicParams busTopicParams) {
             this.fetchTimeout = busTopicParams.getFetchTimeout();
+
+            if (this.fetchTimeout <= 0) {
+                this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
+            } else {
+                // don't sleep too long, even if fetch timeout is large
+                this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+            }
         }
 
         /**
@@ -93,7 +107,8 @@ public interface BusConsumer {
          */
         protected void sleepAfterFetchFailure() {
             try {
-                if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
+                logger.info("{}: backoff for {}ms", this, sleepTime);
+                if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
                     logger.info("{}: closed while handling fetch error", this);
                 }
 
@@ -185,8 +200,7 @@ public interface BusConsumer {
             try {
                 return this.consumer.fetch();
             } catch (final IOException e) { //NOSONAR
-                logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
-                        this.fetchTimeout);
+                logger.error("{}: cannot fetch because of {}", this, e.getMessage());
                 sleepAfterFetchFailure();
                 throw e;
             }
index aba0588..21050f9 100644 (file)
@@ -46,6 +46,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Dmaa
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.powermock.reflect.Whitebox;
 
 public class BusConsumerTest extends TopicTestBase {
@@ -59,10 +60,30 @@ public class BusConsumerTest extends TopicTestBase {
         super.setUp();
     }
 
+    @Test
+    public void testFetchingBusConsumer() throws InterruptedException {
+        // should not be negative
+        var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
+        assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+        // should not be zero
+        cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
+        assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+        // should not be too large
+        cons = new FetchingBusConsumerImpl(
+                        makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
+        assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+
+        // should not be what was specified
+        cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
+        assertThat(cons.getSleepTime()).isEqualTo(100);
+    }
+
     @Test
     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
 
-        var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
+        var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
 
             private CountDownLatch started = new CountDownLatch(1);
 
@@ -71,11 +92,6 @@ public class BusConsumerTest extends TopicTestBase {
                 started.countDown();
                 super.sleepAfterFetchFailure();
             }
-
-            @Override
-            public Iterable<String> fetch() throws IOException {
-                return null;
-            }
         };
 
         // full sleep
@@ -278,4 +294,16 @@ public class BusConsumerTest extends TopicTestBase {
     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
     }
+
+    private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
+
+        protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
+            super(busTopicParams);
+        }
+
+        @Override
+        public Iterable<String> fetch() throws IOException {
+            return null;
+        }
+    }
 }