PnfReadyEventConsumer implementation 45/42545/4
authorLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Thu, 12 Apr 2018 16:24:12 +0000 (18:24 +0200)
committerLukasz Muszkieta <lukasz.muszkieta@nokia.com>
Mon, 16 Apr 2018 08:19:26 +0000 (10:19 +0200)
Change-Id: I7252400a3f60ca22ddfa71edb28eaf1d16ccd9b4
Issue-ID: SO-466
Signed-off-by: Lukasz Muszkieta <lukasz.muszkieta@nokia.com>
bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties
bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties [deleted file]
bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml [deleted file]
common/src/main/java/org/openecomp/mso/logger/MsoLogger.java

index e6019f7..6871665 100644 (file)
@@ -22,17 +22,28 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.ws.rs.core.UriBuilder;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient;
+import org.openecomp.mso.jsonpath.JsonPathUtil;
+import org.openecomp.mso.logger.MsoLogger;
 
-public class PnfEventReadyConsumer {
+public class PnfEventReadyConsumer implements Runnable, DmaapClient {
+
+    private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
 
     private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId";
     private HttpClient httpClient;
-
     private String dmaapHost;
     private int dmaapPort;
     private String dmaapProtocol;
@@ -40,24 +51,57 @@ public class PnfEventReadyConsumer {
     private String dmaapTopicName;
     private String consumerId;
     private String consumerGroup;
+    private Map<String, Runnable> pnfCorrelationIdToThreadMap;
+    private HttpGet getRequest;
+    private ScheduledExecutorService executor;
+    private int dmaapClientInitialDelayInSeconds;
+    private int dmaapClientDelayInSeconds;
+    private boolean dmaapThreadListenerIsRunning;
 
     public PnfEventReadyConsumer() {
         httpClient = HttpClientBuilder.create().build();
+        pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
+        executor = null;
+    }
+
+    public void init() {
+        getRequest = new HttpGet(buildURI());
+    }
+
+    @Override
+    public void run() {
+        try {
+            HttpResponse response = httpClient.execute(getRequest);
+            getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+        } catch (IOException e) {
+            LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+        }
+    }
+
+    @Override
+    public void registerForUpdate(String correlationId, Runnable informConsumer) {
+        pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
+        if (!dmaapThreadListenerIsRunning) {
+            startDmaapThreadListener();
+        }
     }
 
-    public void notifyWhenPnfReady(String correlationId)
-            throws IOException {
-        HttpGet getRequest = new HttpGet(buildURI(consumerGroup, consumerId));
-        HttpResponse response = httpClient.execute(getRequest);
-        checkIfResponseIsAccepted(response, correlationId);
+    private void startDmaapThreadListener() {
+        executor = Executors.newScheduledThreadPool(1);
+        executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds,
+                dmaapClientDelayInSeconds, TimeUnit.SECONDS);
+        dmaapThreadListenerIsRunning = true;
     }
 
-    private boolean checkIfResponseIsAccepted(HttpResponse response, String correlationId) {
-        // TODO parse response if contains proper correlationId
-        return false;
+    private void stopDmaapThreadListener() {
+        if (dmaapThreadListenerIsRunning) {
+            executor.shutdownNow();
+            dmaapThreadListenerIsRunning = false;
+            executor = null;
+        }
     }
 
-    private URI buildURI(String consumerGroup, String consumerId) {
+    private URI buildURI() {
         return UriBuilder.fromUri(dmaapUriPathPrefix)
                 .scheme(dmaapProtocol)
                 .host(dmaapHost)
@@ -65,6 +109,31 @@ public class PnfEventReadyConsumer {
                 .path(consumerGroup).path(consumerId).build();
     }
 
+    private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+        if (response.getStatusLine().getStatusCode() == 200) {
+            String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+            if (responseString != null) {
+                return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+            }
+        }
+        return Optional.empty();
+    }
+
+
+    private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+        pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny()
+                .ifPresent(this::informAboutPnfReady);
+    }
+
+    private void informAboutPnfReady(String correlationId) {
+        pnfCorrelationIdToThreadMap.get(correlationId).run();
+        pnfCorrelationIdToThreadMap.remove(correlationId);
+
+        if (pnfCorrelationIdToThreadMap.isEmpty()) {
+            stopDmaapThreadListener();
+        }
+    }
+
     public void setDmaapHost(String dmaapHost) {
         this.dmaapHost = dmaapHost;
     }
@@ -93,4 +162,12 @@ public class PnfEventReadyConsumer {
         this.consumerGroup = consumerGroup;
     }
 
+    public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) {
+        this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds;
+    }
+
+    public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) {
+        this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
+    }
+
 }
index 3c4dca4..5b1ffac 100644 (file)
@@ -1,7 +1,9 @@
-dmaapHost=HOSTNAME
-dmaapPort=3905
-dmaapProtocol=http
-dmaapUriPathPrefix = events
+host=HOSTNAME
+port=3905
+protocol=http
+uriPathPrefix = events
 eventReadyTopicName=pnfEventReady
 consumerId=consumerId
 consumerGroup=group
+clientThreadInitialDelayInSeconds=1
+clientThreadDelayInSeconds=5
\ No newline at end of file
index ed1556b..6fe70ff 100644 (file)
     <!--<property name="dmaapClient" ref="dmaapClient"/>-->\r
   </bean>\r
 \r
-  <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer">\r
-    <property name="dmaapHost" value="${dmaapHost}" />\r
-    <property name="dmaapPort" value="${dmaapPort}"/>\r
-    <property name="dmaapProtocol" value="${dmaapProtocol}"/>\r
-    <property name="dmaapUriPathPrefix" value="${dmaapUriPathPrefix}"/>\r
+  <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"\r
+    init-method="init">\r
+    <property name="dmaapHost" value="${host}"/>\r
+    <property name="dmaapPort" value="${port}"/>\r
+    <property name="dmaapProtocol" value="${protocol}"/>\r
+    <property name="dmaapUriPathPrefix" value="${uriPathPrefix}"/>\r
     <property name="dmaapTopicName" value="${eventReadyTopicName}"/>\r
-    <property name= "consumerGroup" value="${consumerGroup}"/>\r
+    <property name="consumerGroup" value="${consumerGroup}"/>\r
     <property name="consumerId" value="${consumerId}"/>\r
+    <property name="dmaapClientInitialDelayInSeconds" value="${clientThreadInitialDelayInSeconds}"/>\r
+    <property name="dmaapClientDelayInSeconds" value="${clientThreadDelayInSeconds}"/>\r
   </bean>\r
 \r
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">\r
index 2f6a00d..ef8fa3d 100644 (file)
 package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.message.BasicHttpResponse;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringRunner;
 
-@RunWith(SpringRunner.class)
-@ContextConfiguration({"classpath:springConfig_PnfEventReadyConsumer.xml"})
 public class PnfEventReadyConsumerTest {
 
-    @Autowired
-    private PnfEventReadyConsumer pnfEventReadyConsumer;
+    private static final String CORRELATION_ID = "corrTestId";
+    private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
+    private static final String JSON_EXAMPLE_WITH_CORRELATION_ID =
+            "{\"pnfRegistrationFields\":{\"correlationId\":\"%s\"}}";
+    private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
+            "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
 
+    private static final String HOST = "hostTest";
+    private static final int PORT = 1234;
+    private static final String PROTOCOL = "http";
+    private static final String URI_PATH_PREFIX = "eventsForTesting";
+    private static final String EVENT_TOPIC_TEST = "eventTopicTest";
+    private static final String CONSUMER_ID = "consumerTestId";
+    private static final String CONSUMER_GROUP = "consumerGroupTest";
+
+    private PnfEventReadyConsumer testedObject;
     private HttpClient httpClientMock;
+    private Runnable threadMockToNotifyCamundaFlow;
+    private ScheduledExecutorService executorMock;
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
+        testedObject = new PnfEventReadyConsumer();
+        testedObject.setDmaapHost(HOST);
+        testedObject.setDmaapPort(PORT);
+        testedObject.setDmaapProtocol(PROTOCOL);
+        testedObject.setDmaapUriPathPrefix(URI_PATH_PREFIX);
+        testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
+        testedObject.setConsumerId(CONSUMER_ID);
+        testedObject.setConsumerGroup(CONSUMER_GROUP);
+        testedObject.setDmaapClientInitialDelayInSeconds(1);
+        testedObject.setDmaapClientDelayInSeconds(1);
+        testedObject.init();
         httpClientMock = mock(HttpClient.class);
+        threadMockToNotifyCamundaFlow = mock(Runnable.class);
+        executorMock = mock(ScheduledExecutorService.class);
         setPrivateField();
     }
 
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the key that we get from response
+     * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
+     * and shutdown the executor because of empty map
+     */
     @Test
-    public void restClientInvokesWithProperURI() throws Exception {
+    public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
+            throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
+        testedObject.run();
         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
-        pnfEventReadyConsumer.notifyWhenPnfReady("correlationId");
         verify(httpClientMock).execute(captor1.capture());
-        assertThat(captor1.getValue().getURI()).hasHost("hostTest").hasPort(1234).hasScheme("http")
-                .hasPath("/eventsForTesting/eventTopicTest/consumerGroupTest/consumerTestId");
+        assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
+                .hasPath(
+                        "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
+        verify(threadMockToNotifyCamundaFlow).run();
+        verify(executorMock).shutdownNow();
+    }
+
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the correlationId that does not match to correlationId
+     * taken from http response. run method should not do anything with the map not run any thread to
+     * notify camunda process
+     */
+    @Test
+    public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
+            throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(
+                        String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
+        testedObject.run();
+        verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
+    }
+
+    /**
+     * Test run method, where the are following conditions:
+     * <p> - DmaapThreadListener is running, flag is set to true
+     * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
+     * run method should not do anything with the map and not run any thread to notify camunda process
+     */
+    @Test
+    public void correlationIdIsNotFoundInHttpResponse() throws IOException {
+        when(httpClientMock.execute(any(HttpGet.class))).
+                thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
+        testedObject.run();
+        verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
 
     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
-        Field field = pnfEventReadyConsumer.getClass().getDeclaredField("httpClient");
-        field.setAccessible(true);
-        field.set(pnfEventReadyConsumer, httpClientMock);
+        Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(testedObject, httpClientMock);
+        httpClientField.setAccessible(false);
+
+        Field executorField = testedObject.getClass().getDeclaredField("executor");
+        executorField.setAccessible(true);
+        executorField.set(testedObject, executorMock);
+        executorField.setAccessible(false);
+
+        Field pnfCorrelationToThreadMapField = testedObject.getClass()
+                .getDeclaredField("pnfCorrelationIdToThreadMap");
+        pnfCorrelationToThreadMapField.setAccessible(true);
+        Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
+        pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
+        pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
+
+        Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
+        threadRunFlag.setAccessible(true);
+        threadRunFlag.set(testedObject, true);
+        threadRunFlag.setAccessible(false);
+    }
+
+    private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
+        HttpEntity entity = new StringEntity(json);
+        ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
+        HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
+        response.setEntity(entity);
+        response.setStatusCode(200);
+        return response;
     }
 
 }
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties b/bpmn/MSOInfrastructureBPMN/src/test/resources/dmaapTest.properties
deleted file mode 100644 (file)
index a8df15c..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-dmaapHost=hostTest
-dmaapPort=1234
-dmaapProtocol=http
-dmaapUriPathPrefix = eventsForTesting
-eventReadyTopicName=eventTopicTest
-consumerId=consumerTestId
-consumerGroup=consumerGroupTest
\ No newline at end of file
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml b/bpmn/MSOInfrastructureBPMN/src/test/resources/springConfig_PnfEventReadyConsumer.xml
deleted file mode 100644 (file)
index 5abee9d..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-<beans xmlns="http://www.springframework.org/schema/beans"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans
-                         http://www.springframework.org/schema/beans/spring-beans.xsd">
-  <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer">
-    <property name="dmaapHost" value="${dmaapHost}" />
-    <property name="dmaapPort" value="${dmaapPort}"/>
-    <property name="dmaapProtocol" value="${dmaapProtocol}"/>
-    <property name="dmaapUriPathPrefix" value="${dmaapUriPathPrefix}"/>
-    <property name="dmaapTopicName" value="${eventReadyTopicName}"/>
-    <property name= "consumerGroup" value="${consumerGroup}"/>
-    <property name="consumerId" value="${consumerId}"/>
-  </bean>
-
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
-    <property name="locations" value="classpath:dmaapTest.properties"/>
-  </bean>
-
-</beans>
index 45f2746..9f91880 100644 (file)
@@ -279,6 +279,16 @@ public class MsoLogger {
         logger.debug(msg, t);
     }
 
+    /**
+     * Log error message with the details of the exception that caused the error.
+     * @param msg
+     * @param throwable
+     */
+    public void error(String msg, Throwable throwable) {
+        prepareMsg(ERROR_LEVEL);
+        logger.error(msg, throwable);
+    }
+
     // Info methods
     /**
      * Record the Info event