X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=bpmn%2Fso-bpmn-infrastructure-common%2Fsrc%2Ftest%2Fjava%2Forg%2Fonap%2Fso%2Fbpmn%2Finfrastructure%2Fpnf%2Fdmaap%2FPnfEventReadyDmaapClientTest.java;h=546e644fbdf1ad80709627c74764e7bba1b01c7f;hb=61b3ff91485571c24834b31c6ee7efc7ab1d0243;hp=bbb6aad49b1aa781af367c038a34c45760a8701f;hpb=e83aa94d93e78e92fbed0c45924fce5aaf2d00c8;p=so.git diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java index bbb6aad49b..546e644fbd 100644 --- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java +++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java @@ -28,45 +28,35 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.entity.InputStreamEntity; -import org.apache.http.message.BasicHttpResponse; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread; +import org.onap.so.client.kafka.KafkaConsumerImpl; import org.springframework.core.env.Environment; + @RunWith(MockitoJUnitRunner.class) public class PnfEventReadyDmaapClientTest { - + private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"; private static final String PNF_CORRELATION_ID = "corrTestId"; private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId"; - private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\"," - + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]"; - - private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]"; + private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = + {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}", + "{\"correlationId\": \"corr\",\"value\":\"value2\"}"}; - private static final String HOST = "hostTest"; - private static final int PORT = 1234; - private static final String PROTOCOL = "http"; - private static final String URI_PATH_PREFIX = "eventsForTesting"; + private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}"; private static final String TOPIC_NAME = "unauthenticated.PNF_READY"; private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE"; private static final String CONSUMER_ID = "so-bpmn-infra-pnfready"; @@ -79,26 +69,23 @@ public class PnfEventReadyDmaapClientTest { private PnfEventReadyDmaapClient testedObject; private DmaapTopicListenerThread testedObjectInnerClassThread; - private HttpClient httpClientMock; + private KafkaConsumerImpl kafkaConsumerMock; private Runnable threadMockToNotifyCamundaFlow; private ScheduledThreadPoolExecutor executorMock; @Before - public void init() throws NoSuchFieldException, IllegalAccessException { - when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT); - when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST); - when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL); - when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX); - when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); - when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID); - when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); - when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP); + public void init() throws NoSuchFieldException, IllegalAccessException, IOException { + when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS); + when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME); + when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID); + when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE); + when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP); when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class))) .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS); testedObject = new PnfEventReadyDmaapClient(env); testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread(); - httpClientMock = mock(HttpClient.class); + kafkaConsumerMock = mock(KafkaConsumerImpl.class); threadMockToNotifyCamundaFlow = mock(Runnable.class); executorMock = mock(ScheduledThreadPoolExecutor.class); setPrivateField(); @@ -116,17 +103,11 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - ArgumentCaptor captor1 = ArgumentCaptor.forClass(HttpGet.class); - verify(httpClientMock).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); } @@ -134,20 +115,12 @@ public class PnfEventReadyDmaapClientTest { @Test public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException { - ArgumentCaptor captor1 = ArgumentCaptor.forClass(HttpGet.class); - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse_forReady( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))) - .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Collections.emptyList()) + .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); - verify(httpClientMock, times(2)).execute(captor1.capture()); - assertEquals(captor1.getValue().getURI().getHost(), HOST); - assertEquals(captor1.getValue().getURI().getPort(), PORT); - assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL); - - assertEquals(captor1.getValue().getURI().getPath(), - "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + ""); - + verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID); verify(threadMockToNotifyCamundaFlow).run(); verify(executorMock).shutdown(); @@ -164,8 +137,9 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse( - String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP))); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList( + String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP), + JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1])); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } @@ -180,17 +154,22 @@ public class PnfEventReadyDmaapClientTest { */ @Test public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException { - when(httpClientMock.execute(any(HttpGet.class))) - .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); + when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))) + .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID)); testedObjectInnerClassThread.run(); verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock); } private void setPrivateField() throws NoSuchFieldException, IllegalAccessException { - Field httpClientField = testedObject.getClass().getDeclaredField("httpClient"); - httpClientField.setAccessible(true); - httpClientField.set(testedObject, httpClientMock); - httpClientField.setAccessible(false); + Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady"); + consumerForPnfReadyField.setAccessible(true); + consumerForPnfReadyField.set(testedObject, kafkaConsumerMock); + consumerForPnfReadyField.setAccessible(false); + + Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate"); + consumerForPnfUpdateField.setAccessible(true); + consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock); + consumerForPnfUpdateField.setAccessible(false); Field executorField = testedObject.getClass().getDeclaredField("executor"); executorField.setAccessible(true); @@ -209,22 +188,4 @@ public class PnfEventReadyDmaapClientTest { threadRunFlag.setAccessible(false); } - private HttpResponse createResponse(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(200); - return response; - } - - private HttpResponse createResponse_forReady(String json) { - HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes())); - ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1); - HttpResponse response = new BasicHttpResponse(protocolVersion, 1, ""); - response.setEntity(entity); - response.setStatusCode(500); - return response; - } - }