PnfReadyEventConsumer implementation 39/44639/1
authorLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Wed, 25 Apr 2018 09:54:47 +0000 (11:54 +0200)
committerLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Wed, 25 Apr 2018 09:58:28 +0000 (11:58 +0200)
Change-Id: Ic8d5814c555bad436bfcbe1b4e212637a6800947
Issue-ID: SO-466
Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java [moved from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java with 78% similarity]
bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java [moved from bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java with 93% similarity]

@@ -20,7 +20,6 @@
 
 package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -38,7 +37,7 @@ import org.apache.http.util.EntityUtils;
 import org.openecomp.mso.jsonpath.JsonPathUtil;
 import org.openecomp.mso.logger.MsoLogger;
 
-public class PnfEventReadyConsumer implements DmaapClient {
+public class PnfEventReadyDmaapClient implements DmaapClient {
 
     private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
 
@@ -57,7 +56,7 @@ public class PnfEventReadyConsumer implements DmaapClient {
     private int dmaapClientDelayInSeconds;
     private volatile boolean dmaapThreadListenerIsRunning;
 
-    public PnfEventReadyConsumer() {
+    public PnfEventReadyDmaapClient() {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         executor = null;
@@ -67,17 +66,6 @@ public class PnfEventReadyConsumer implements DmaapClient {
         getRequest = new HttpGet(buildURI());
     }
 
-    //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
-    @VisibleForTesting
-    void sendRequest() {
-        try {
-            HttpResponse response = httpClient.execute(getRequest);
-            getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
-        } catch (IOException e) {
-            LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-        }
-    }
-
     @Override
     public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
         pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
@@ -98,7 +86,7 @@ public class PnfEventReadyConsumer implements DmaapClient {
     private synchronized void startDmaapThreadListener() {
         if (!dmaapThreadListenerIsRunning) {
             executor = Executors.newScheduledThreadPool(1);
-            executor.scheduleWithFixedDelay(this::sendRequest, 0,
+            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
                     dmaapClientDelayInSeconds, TimeUnit.SECONDS);
             dmaapThreadListenerIsRunning = true;
         }
@@ -120,24 +108,6 @@ public class PnfEventReadyConsumer implements DmaapClient {
                 .path(consumerGroup).path(consumerId).build();
     }
 
-    private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
-        if (response.getStatusLine().getStatusCode() == 200) {
-            String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
-            if (responseString != null) {
-                return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
-            }
-        }
-        return Optional.empty();
-    }
-
-
-    private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
-        Runnable runnable = unregister(correlationId);
-        if (runnable != null) {
-            runnable.run();
-        }
-    }
-
     public void setDmaapHost(String dmaapHost) {
         this.dmaapHost = dmaapHost;
     }
@@ -170,4 +140,34 @@ public class PnfEventReadyConsumer implements DmaapClient {
         this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
     }
 
+    class DmaapTopicListenerThread implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                HttpResponse response = httpClient.execute(getRequest);
+                getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+            } catch (IOException e) {
+                LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+            }
+        }
+
+        private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+            if (response.getStatusLine().getStatusCode() == 200) {
+                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+                if (responseString != null) {
+                    return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+                }
+            }
+            return Optional.empty();
+        }
+
+        private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+            Runnable runnable = unregister(correlationId);
+            if (runnable != null) {
+                runnable.run();
+            }
+        }
+    }
+
 }
index 4ddeba1..13ab4f8 100644 (file)
   </bean>\r
 \r
   <bean id="informDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.InformDmaapClient">\r
-    <property name="dmaapClient" ref="pnfEventReadyConsumer"/>\r
+    <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>\r
   </bean>\r
 \r
   <bean id="cancelDmaapSubscription" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.CancelDmaapSubscription">\r
-    <property name="dmaapClient" ref="pnfEventReadyConsumer"/>\r
+    <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>\r
   </bean>\r
 \r
-  <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"\r
+  <bean id="pnfEventReadyDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient"\r
     init-method="init">\r
     <property name="dmaapHost" value="${host}"/>\r
     <property name="dmaapPort" value="${port}"/>\r
@@ -43,8 +43,9 @@ import org.apache.http.message.BasicHttpResponse;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
 
-public class PnfEventReadyConsumerTest {
+public class PnfEventReadyDmaapClientTest {
 
     private static final String CORRELATION_ID = "corrTestId";
     private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
@@ -61,14 +62,15 @@ public class PnfEventReadyConsumerTest {
     private static final String CONSUMER_ID = "consumerTestId";
     private static final String CONSUMER_GROUP = "consumerGroupTest";
 
-    private PnfEventReadyConsumer testedObject;
+    private PnfEventReadyDmaapClient testedObject;
+    private DmaapTopicListenerThread testedObjectInnerClassThread;
     private HttpClient httpClientMock;
     private Runnable threadMockToNotifyCamundaFlow;
     private ScheduledExecutorService executorMock;
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        testedObject = new PnfEventReadyConsumer();
+        testedObject = new PnfEventReadyDmaapClient();
         testedObject.setDmaapHost(HOST);
         testedObject.setDmaapPort(PORT);
         testedObject.setDmaapProtocol(PROTOCOL);
@@ -78,6 +80,7 @@ public class PnfEventReadyConsumerTest {
         testedObject.setConsumerGroup(CONSUMER_GROUP);
         testedObject.setDmaapClientDelayInSeconds(1);
         testedObject.init();
+        testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
         httpClientMock = mock(HttpClient.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
         executorMock = mock(ScheduledExecutorService.class);
@@ -96,7 +99,7 @@ public class PnfEventReadyConsumerTest {
             throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
         verify(httpClientMock).execute(captor1.capture());
         assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
@@ -119,7 +122,7 @@ public class PnfEventReadyConsumerTest {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(
                         String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
 
@@ -133,7 +136,7 @@ public class PnfEventReadyConsumerTest {
     public void correlationIdIsNotFoundInHttpResponse() throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }