[SO] Remove DMaap Dependency in SO-bpmn-infra
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClientTest.java
index bbb6aad..546e644 100644 (file)
@@ -28,45 +28,35 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.ProtocolVersion;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.message.BasicHttpResponse;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.onap.so.client.kafka.KafkaConsumerImpl;
 import org.springframework.core.env.Environment;
 
+
 @RunWith(MockitoJUnitRunner.class)
 public class PnfEventReadyDmaapClientTest {
-
+    private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
     private static final String PNF_CORRELATION_ID = "corrTestId";
     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
-    private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\","
-            + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]";
-
-    private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]";
+    private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID =
+            {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}",
+                    "{\"correlationId\": \"corr\",\"value\":\"value2\"}"};
 
-    private static final String HOST = "hostTest";
-    private static final int PORT = 1234;
-    private static final String PROTOCOL = "http";
-    private static final String URI_PATH_PREFIX = "eventsForTesting";
+    private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}";
     private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
     private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
     private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
@@ -79,26 +69,23 @@ public class PnfEventReadyDmaapClientTest {
     private PnfEventReadyDmaapClient testedObject;
 
     private DmaapTopicListenerThread testedObjectInnerClassThread;
-    private HttpClient httpClientMock;
+    private KafkaConsumerImpl kafkaConsumerMock;
     private Runnable threadMockToNotifyCamundaFlow;
     private ScheduledThreadPoolExecutor executorMock;
 
     @Before
-    public void init() throws NoSuchFieldException, IllegalAccessException {
-        when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
-        when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
-        when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
-        when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
-        when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
-        when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
-        when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
-        when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
-        when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
+    public void init() throws NoSuchFieldException, IllegalAccessException, IOException {
+        when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS);
+        when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
+        when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
+        when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
+        when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
+        when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
         testedObject = new PnfEventReadyDmaapClient(env);
         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
-        httpClientMock = mock(HttpClient.class);
+        kafkaConsumerMock = mock(KafkaConsumerImpl.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
         executorMock = mock(ScheduledThreadPoolExecutor.class);
         setPrivateField();
@@ -116,17 +103,11 @@ public class PnfEventReadyDmaapClientTest {
      */
     @Test
     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
-        when(httpClientMock.execute(any(HttpGet.class)))
-                .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
+        when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+                .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
+                        JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
         testedObjectInnerClassThread.run();
-        ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
-        verify(httpClientMock).execute(captor1.capture());
-        assertEquals(captor1.getValue().getURI().getHost(), HOST);
-        assertEquals(captor1.getValue().getURI().getPort(), PORT);
-        assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
-        assertEquals(captor1.getValue().getURI().getPath(),
-                "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + "");
-
+        verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE);
         verify(threadMockToNotifyCamundaFlow).run();
         verify(executorMock).shutdown();
     }
@@ -134,20 +115,12 @@ public class PnfEventReadyDmaapClientTest {
 
     @Test
     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
-        ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
-        when(httpClientMock.execute(any(HttpGet.class)))
-                .thenReturn(createResponse_forReady(
-                        String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)))
-                .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
+        when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+                .thenReturn(Collections.emptyList())
+                .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
+                        JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
         testedObjectInnerClassThread.run();
-        verify(httpClientMock, times(2)).execute(captor1.capture());
-        assertEquals(captor1.getValue().getURI().getHost(), HOST);
-        assertEquals(captor1.getValue().getURI().getPort(), PORT);
-        assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
-
-        assertEquals(captor1.getValue().getURI().getPath(),
-                "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
-
+        verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID);
 
         verify(threadMockToNotifyCamundaFlow).run();
         verify(executorMock).shutdown();
@@ -164,8 +137,9 @@ public class PnfEventReadyDmaapClientTest {
      */
     @Test
     public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
-        when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse(
-                String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP)));
+        when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList(
+                String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP),
+                JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
         testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
@@ -180,17 +154,22 @@ public class PnfEventReadyDmaapClientTest {
      */
     @Test
     public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
-        when(httpClientMock.execute(any(HttpGet.class)))
-                .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
+        when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+                .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
         testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
 
     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
-        Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
-        httpClientField.setAccessible(true);
-        httpClientField.set(testedObject, httpClientMock);
-        httpClientField.setAccessible(false);
+        Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady");
+        consumerForPnfReadyField.setAccessible(true);
+        consumerForPnfReadyField.set(testedObject, kafkaConsumerMock);
+        consumerForPnfReadyField.setAccessible(false);
+
+        Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate");
+        consumerForPnfUpdateField.setAccessible(true);
+        consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock);
+        consumerForPnfUpdateField.setAccessible(false);
 
         Field executorField = testedObject.getClass().getDeclaredField("executor");
         executorField.setAccessible(true);
@@ -209,22 +188,4 @@ public class PnfEventReadyDmaapClientTest {
         threadRunFlag.setAccessible(false);
     }
 
-    private HttpResponse createResponse(String json) {
-        HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
-        ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
-        HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
-        response.setEntity(entity);
-        response.setStatusCode(200);
-        return response;
-    }
-
-    private HttpResponse createResponse_forReady(String json) {
-        HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
-        ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
-        HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
-        response.setEntity(entity);
-        response.setStatusCode(500);
-        return response;
-    }
-
 }