Synchronization fix for dmaap client 61/43361/1
authorbiniek <lukasz.biniek@nokia.com>
Tue, 17 Apr 2018 14:08:12 +0000 (16:08 +0200)
committerbiniek <lukasz.biniek@nokia.com>
Tue, 17 Apr 2018 14:08:12 +0000 (16:08 +0200)
Change-Id: Ibcad191dc0994c8c4498ebdbc82e4c1f694517bd
Issue-ID: SO-506
Signed-off-by: biniek <lukasz.biniek@nokia.com>
bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/InformDmaapClient.java
bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/DmaapClient.java [moved from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/implementation/DmaapClient.java with 94% similarity]
bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/delegate/DmaapClientTestImpl.java
bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java

index 5a8d741..edff36f 100644 (file)
@@ -22,7 +22,7 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.delegate;
 
 import org.camunda.bpm.engine.delegate.DelegateExecution;
 import org.camunda.bpm.engine.delegate.JavaDelegate;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient;
 import org.springframework.beans.factory.annotation.Autowired;
 
 public class InformDmaapClient implements JavaDelegate {
index 6871665..8c9903e 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -34,11 +35,10 @@ import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
 import org.openecomp.mso.jsonpath.JsonPathUtil;
 import org.openecomp.mso.logger.MsoLogger;
 
-public class PnfEventReadyConsumer implements Runnable, DmaapClient {
+public class PnfEventReadyConsumer implements DmaapClient {
 
     private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
 
@@ -54,9 +54,8 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
     private Map<String, Runnable> pnfCorrelationIdToThreadMap;
     private HttpGet getRequest;
     private ScheduledExecutorService executor;
-    private int dmaapClientInitialDelayInSeconds;
     private int dmaapClientDelayInSeconds;
-    private boolean dmaapThreadListenerIsRunning;
+    private volatile boolean dmaapThreadListenerIsRunning;
 
     public PnfEventReadyConsumer() {
         httpClient = HttpClientBuilder.create().build();
@@ -68,8 +67,9 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
         getRequest = new HttpGet(buildURI());
     }
 
-    @Override
-    public void run() {
+    //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
+    @VisibleForTesting
+    void sendRequest() {
         try {
             HttpResponse response = httpClient.execute(getRequest);
             getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
@@ -79,21 +79,23 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
     }
 
     @Override
-    public void registerForUpdate(String correlationId, Runnable informConsumer) {
+    public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
         pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
         if (!dmaapThreadListenerIsRunning) {
             startDmaapThreadListener();
         }
     }
 
-    private void startDmaapThreadListener() {
-        executor = Executors.newScheduledThreadPool(1);
-        executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds,
-                dmaapClientDelayInSeconds, TimeUnit.SECONDS);
-        dmaapThreadListenerIsRunning = true;
+    private synchronized void startDmaapThreadListener() {
+        if (!dmaapThreadListenerIsRunning) {
+            executor = Executors.newScheduledThreadPool(1);
+            executor.scheduleWithFixedDelay(this::sendRequest, 0,
+                    dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+            dmaapThreadListenerIsRunning = true;
+        }
     }
 
-    private void stopDmaapThreadListener() {
+    private synchronized void stopDmaapThreadListener() {
         if (dmaapThreadListenerIsRunning) {
             executor.shutdownNow();
             dmaapThreadListenerIsRunning = false;
@@ -120,17 +122,14 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
     }
 
 
-    private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
-        pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny()
-                .ifPresent(this::informAboutPnfReady);
-    }
-
-    private void informAboutPnfReady(String correlationId) {
-        pnfCorrelationIdToThreadMap.get(correlationId).run();
-        pnfCorrelationIdToThreadMap.remove(correlationId);
+    private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+        Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId);
+        if (runnable != null) {
+            runnable.run();
 
-        if (pnfCorrelationIdToThreadMap.isEmpty()) {
-            stopDmaapThreadListener();
+            if (pnfCorrelationIdToThreadMap.isEmpty()) {
+                stopDmaapThreadListener();
+            }
         }
     }
 
@@ -162,10 +161,6 @@ public class PnfEventReadyConsumer implements Runnable, DmaapClient {
         this.consumerGroup = consumerGroup;
     }
 
-    public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) {
-        this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds;
-    }
-
     public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
         this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
     }
index 6fe70ff..33ba460 100644 (file)
@@ -28,7 +28,6 @@
     <property name="dmaapTopicName" value="${eventReadyTopicName}"/>\r
     <property name="consumerGroup" value="${consumerGroup}"/>\r
     <property name="consumerId" value="${consumerId}"/>\r
-    <property name="dmaapClientInitialDelayInSeconds" value="${clientThreadInitialDelayInSeconds}"/>\r
     <property name="dmaapClientDelayInSeconds" value="${clientThreadDelayInSeconds}"/>\r
   </bean>\r
 \r
index 1103597..55dd3a9 100644 (file)
@@ -20,7 +20,7 @@
 
 package org.openecomp.mso.bpmn.infrastructure.pnf.delegate;
 
-import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.DmaapClient;
 
 public class DmaapClientTestImpl implements DmaapClient {
     private String correlationId;
index ef8fa3d..73b8247 100644 (file)
@@ -76,7 +76,6 @@ public class PnfEventReadyConsumerTest {
         testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
         testedObject.setConsumerId(CONSUMER_ID);
         testedObject.setConsumerGroup(CONSUMER_GROUP);
-        testedObject.setDmaapClientInitialDelayInSeconds(1);
         testedObject.setDmaapClientDelayInSeconds(1);
         testedObject.init();
         httpClientMock = mock(HttpClient.class);
@@ -97,7 +96,7 @@ public class PnfEventReadyConsumerTest {
             throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
-        testedObject.run();
+        testedObject.sendRequest();
         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
         verify(httpClientMock).execute(captor1.capture());
         assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
@@ -120,7 +119,7 @@ public class PnfEventReadyConsumerTest {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(
                         String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
-        testedObject.run();
+        testedObject.sendRequest();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
 
@@ -134,7 +133,7 @@ public class PnfEventReadyConsumerTest {
     public void correlationIdIsNotFoundInHttpResponse() throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
-        testedObject.run();
+        testedObject.sendRequest();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }