Fix sonar duplicate code issue 07/121507/3
authorJim Hahn <jrh3@att.com>
Fri, 21 May 2021 21:31:02 +0000 (17:31 -0400)
committerJim Hahn <jrh3@att.com>
Fri, 21 May 2021 21:43:43 +0000 (17:43 -0400)
Issue-ID: POLICY-3284
Change-Id: I78c3a8ac92e18e2b0088eb07e27a4e97866d6182
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java

index 5fb3aed..1e2c82b 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
  * ================================================================================
@@ -61,10 +61,58 @@ public interface BusConsumer {
      */
     public void close();
 
+    /**
+     * Consumer that handles fetch() failures by sleeping.
+     */
+    public abstract static class FetchingBusConsumer implements BusConsumer {
+        private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
+
+        /**
+         * Fetch timeout.
+         */
+        protected int fetchTimeout;
+
+        /**
+         * Counted down when {@link #close()} is invoked.
+         */
+        private CountDownLatch closeCondition = new CountDownLatch(1);
+
+
+        /**
+         * Constructs the object.
+         *
+         * @param busTopicParams parameters for the bus topic
+         */
+        protected FetchingBusConsumer(BusTopicParams busTopicParams) {
+            this.fetchTimeout = busTopicParams.getFetchTimeout();
+        }
+
+        /**
+         * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
+         * or the thread is interrupted, then this will return immediately.
+         */
+        protected void sleepAfterFetchFailure() {
+            try {
+                if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
+                    logger.info("{}: closed while handling fetch error", this);
+                }
+
+            } catch (InterruptedException e) {
+                logger.warn("{}: interrupted while handling fetch error", this, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        @Override
+        public void close() {
+            this.closeCondition.countDown();
+        }
+    }
+
     /**
      * Cambria based consumer.
      */
-    public static class CambriaConsumerWrapper implements BusConsumer {
+    public static class CambriaConsumerWrapper extends FetchingBusConsumer {
 
         /**
          * logger.
@@ -81,16 +129,6 @@ public interface BusConsumer {
          */
         private final CambriaConsumer consumer;
 
-        /**
-         * fetch timeout.
-         */
-        protected int fetchTimeout;
-
-        /**
-         * close condition.
-         */
-        protected CountDownLatch closeCondition = new CountDownLatch(1);
-
         /**
          * Cambria Consumer Wrapper.
          * BusTopicParam object contains the following parameters
@@ -108,8 +146,7 @@ public interface BusConsumer {
          * @throws MalformedURLException - Malformed URL exception
          */
         public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
-
-            this.fetchTimeout = busTopicParams.getFetchTimeout();
+            super(busTopicParams);
 
             this.builder = new CambriaClientBuilders.ConsumerBuilder();
 
@@ -155,19 +192,9 @@ public interface BusConsumer {
             }
         }
 
-        private void sleepAfterFetchFailure() {
-            try {
-                this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
-            } catch (InterruptedException e) {
-                logger.warn("{}: interrupted while handling fetch error", this, e);
-                Thread.currentThread().interrupt();
-            }
-        }
-
         @Override
         public void close() {
-            this.closeCondition.countDown();
+            super.close();
             this.consumer.close();
         }
 
@@ -180,7 +207,7 @@ public interface BusConsumer {
     /**
      * MR based consumer.
      */
-    public abstract class DmaapConsumerWrapper implements BusConsumer {
+    public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
 
         /**
          * logger.
@@ -192,16 +219,6 @@ public interface BusConsumer {
          */
         protected static final String PROTOCOL_PROP = "Protocol";
 
-        /**
-         * fetch timeout.
-         */
-        protected int fetchTimeout;
-
-        /**
-         * close condition.
-         */
-        protected CountDownLatch closeCondition = new CountDownLatch(1);
-
         /**
          * MR Consumer.
          */
@@ -225,8 +242,7 @@ public interface BusConsumer {
          * @throws MalformedURLException URL should be valid
          */
         protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
-            this.fetchTimeout = busTopicParams.getFetchTimeout();
+            super(busTopicParams);
 
             if (busTopicParams.isTopicInvalid()) {
                 throw new IllegalArgumentException("No topic for DMaaP");
@@ -277,19 +293,9 @@ public interface BusConsumer {
             }
         }
 
-        private void sleepAfterFetchFailure() {
-            try {
-                this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
-            } catch (InterruptedException e) {
-                logger.warn("{}: interrupted while handling fetch error", this, e);
-                Thread.currentThread().interrupt();
-            }
-        }
-
         @Override
         public void close() {
-            this.closeCondition.countDown();
+            super.close();
             this.consumer.close();
         }
 
index 82d5eef..aba0588 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 
 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import org.apache.commons.collections4.IteratorUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,16 +45,67 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Camb
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
 import org.powermock.reflect.Whitebox;
 
 public class BusConsumerTest extends TopicTestBase {
 
+    private static final int SHORT_TIMEOUT_MILLIS = 10;
+    private static final int LONG_TIMEOUT_MILLIS = 3000;
+
     @Before
     @Override
     public void setUp() {
         super.setUp();
     }
 
+    @Test
+    public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
+
+        var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
+
+            private CountDownLatch started = new CountDownLatch(1);
+
+            @Override
+            protected void sleepAfterFetchFailure() {
+                started.countDown();
+                super.sleepAfterFetchFailure();
+            }
+
+            @Override
+            public Iterable<String> fetch() throws IOException {
+                return null;
+            }
+        };
+
+        // full sleep
+        long tstart = System.currentTimeMillis();
+        cons.sleepAfterFetchFailure();
+        assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
+
+        // close while sleeping - sleep should halt prematurely
+        cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+        cons.started = new CountDownLatch(1);
+        Thread thread = new Thread(cons::sleepAfterFetchFailure);
+        tstart = System.currentTimeMillis();
+        thread.start();
+        cons.started.await();
+        cons.close();
+        thread.join();
+        assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
+
+        // interrupt while sleeping - sleep should halt prematurely
+        cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
+        cons.started = new CountDownLatch(1);
+        thread = new Thread(cons::sleepAfterFetchFailure);
+        tstart = System.currentTimeMillis();
+        thread.start();
+        cons.started.await();
+        thread.interrupt();
+        thread.join();
+        assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
+    }
+
     @Test
     public void testCambriaConsumerWrapper() {
         // verify that different wrappers can be built