Fix for interrupted exception 81/66081/2
authorbiniek <lukasz.biniek@nokia.com>
Wed, 12 Sep 2018 10:55:45 +0000 (12:55 +0200)
committerbiniek <lukasz.biniek@nokia.com>
Wed, 12 Sep 2018 15:16:40 +0000 (17:16 +0200)
Change-Id: I8d16c9101acb92bd39208abd9e4724c252db3de6
Issue-ID: SO-722
Signed-off-by: biniek <lukasz.biniek@nokia.com>
bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java

index 353b4e3..70323b7 100644 (file)
@@ -26,8 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.core.UriBuilder;
@@ -49,10 +48,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA, PnfEventReadyDmaapClient.class);
 
-    private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId";
-    
-    @Autowired
-    private Environment env;
+    private final Environment env;
     private HttpClient httpClient;
     private String dmaapHost;
     private int dmaapPort;
@@ -63,10 +59,15 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
     private String consumerGroup;
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
     private HttpGet getRequest;
-    private ScheduledExecutorService executor;
     private int dmaapClientDelayInSeconds;
+    private volatile ScheduledThreadPoolExecutor executor;
     private volatile boolean dmaapThreadListenerIsRunning;
 
+    @Autowired
+    public PnfEventReadyDmaapClient(Environment env) {
+        this.env = env;
+    }
+
     public void init() {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
@@ -97,7 +98,9 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     private synchronized void startDmaapThreadListener() {
         if (!dmaapThreadListenerIsRunning) {
-            executor = Executors.newScheduledThreadPool(1);
+            executor = new ScheduledThreadPoolExecutor(1);
+            executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
             executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
                     dmaapClientDelayInSeconds, TimeUnit.SECONDS);
             dmaapThreadListenerIsRunning = true;
@@ -106,7 +109,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
 
     private synchronized void stopDmaapThreadListener() {
         if (dmaapThreadListenerIsRunning) {
-            executor.shutdownNow();
+            executor.shutdown();
             dmaapThreadListenerIsRunning = false;
             executor = null;
         }
@@ -166,7 +169,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
             return Collections.emptyList();
         }
 
-        private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+        private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
             Runnable runnable = unregister(correlationId);
             if (runnable != null) {
                 LOGGER.debug("pnf ready event got from dmaap for correlationId: " + correlationId);
index 08ac9b6..9ae7ad9 100644 (file)
@@ -34,6 +34,7 @@ import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -82,16 +83,16 @@ public class PnfEventReadyDmaapClientTest {
     private static final String CONSUMER_GROUP = "consumerGroupTest";
     @Mock
     private Environment env;
-    @InjectMocks
-    private PnfEventReadyDmaapClient testedObject = new PnfEventReadyDmaapClient();
-;
+    private PnfEventReadyDmaapClient testedObject;
+
     private DmaapTopicListenerThread testedObjectInnerClassThread;
     private HttpClient httpClientMock;
     private Runnable threadMockToNotifyCamundaFlow;
-    private ScheduledExecutorService executorMock;
+    private ScheduledThreadPoolExecutor executorMock;
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
+        testedObject = new PnfEventReadyDmaapClient(env);
        when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
        when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
         testedObject.setDmaapProtocol(PROTOCOL);
@@ -104,7 +105,7 @@ public class PnfEventReadyDmaapClientTest {
         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
         httpClientMock = mock(HttpClient.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
-        executorMock = mock(ScheduledExecutorService.class);
+        executorMock = mock(ScheduledThreadPoolExecutor.class);
         setPrivateField();
     }
 
@@ -127,7 +128,7 @@ public class PnfEventReadyDmaapClientTest {
                 .hasPath(
                         "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
         verify(threadMockToNotifyCamundaFlow).run();
-        verify(executorMock).shutdownNow();
+        verify(executorMock).shutdown();
     }
 
     /**