* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Copyright (C) 2018 Nokia.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
package org.onap.so.bpmn.infrastructure.pnf.dmaap;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.ProtocolVersion;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.message.BasicHttpResponse;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.MockitoJUnitRunner;
import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.onap.so.client.kafka.KafkaConsumerImpl;
import org.springframework.core.env.Environment;
+
+
@RunWith(MockitoJUnitRunner.class)
public class PnfEventReadyDmaapClientTest {
+ private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final String PNF_CORRELATION_ID = "corrTestId";
+ private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
+ private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID =
+ {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}",
+ "{\"correlationId\": \"corr\",\"value\":\"value2\"}"};
+
+ private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}";
+ private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
+ private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
+ private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
+ private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
+ private static final String CONSUMER_GROUP = "so-consumer";
+ private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
- private static final String CORRELATION_ID = "corrTestId";
- private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
- private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = "[\n"
- + " {\n"
- + " \"pnfRegistrationFields\" : {\n"
- + " \"correlationId\" : \"%s\",\n"
- + " \"value\" : \"value1\"\n"
- + " }\n"
- + " },\n"
- + " {\n"
- + " \"pnfRegistrationFields\" : {\n"
- + " \"correlationId\" : \"corr\",\n"
- + " \"value\" : \"value2\"\n"
- + " }\n"
- + " }\n"
- + "]";
- private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
- "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
-
- private static final String HOST = "hostTest";
- private static final int PORT = 1234;
- private static final String PROTOCOL = "http";
- private static final String URI_PATH_PREFIX = "eventsForTesting";
- private static final String EVENT_TOPIC_TEST = "eventTopicTest";
- private static final String CONSUMER_ID = "consumerTestId";
- private static final String CONSUMER_GROUP = "consumerGroupTest";
@Mock
private Environment env;
private PnfEventReadyDmaapClient testedObject;
private DmaapTopicListenerThread testedObjectInnerClassThread;
- private HttpClient httpClientMock;
+ private KafkaConsumerImpl kafkaConsumerMock;
private Runnable threadMockToNotifyCamundaFlow;
private ScheduledThreadPoolExecutor executorMock;
@Before
- public void init() throws NoSuchFieldException, IllegalAccessException {
+ public void init() throws NoSuchFieldException, IllegalAccessException, IOException {
+ when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS);
+ when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
+ when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
+ when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
+ when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
+ when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
+ when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
+ .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
testedObject = new PnfEventReadyDmaapClient(env);
- when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
- when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
- testedObject.setDmaapProtocol(PROTOCOL);
- testedObject.setDmaapUriPathPrefix(URI_PATH_PREFIX);
- testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
- testedObject.setConsumerId(CONSUMER_ID);
- testedObject.setConsumerGroup(CONSUMER_GROUP);
- testedObject.setDmaapClientDelayInSeconds(1);
- testedObject.init();
testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
- httpClientMock = mock(HttpClient.class);
+ kafkaConsumerMock = mock(KafkaConsumerImpl.class);
threadMockToNotifyCamundaFlow = mock(Runnable.class);
executorMock = mock(ScheduledThreadPoolExecutor.class);
setPrivateField();
/**
* Test run method, where the are following conditions:
- * <p> - DmaapThreadListener is running, flag is set to true
- * <p> - map is filled with one entry with the key that we get from response
- * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is
- * empty) and shutdown the executor because of empty map
+ * <p>
+ * - DmaapThreadListener is running, flag is set to true
+ * <p>
+ * - map is filled with one entry with the key that we get from response
+ * <p>
+ * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
+ * and shutdown the executor because of empty map
*/
@Test
- public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
- throws IOException {
- when(httpClientMock.execute(any(HttpGet.class))).
- thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
+ public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
+ when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+ .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
+ JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
+ testedObjectInnerClassThread.run();
+ verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE);
+ verify(threadMockToNotifyCamundaFlow).run();
+ verify(executorMock).shutdown();
+ }
+
+
+ @Test
+ public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
+ when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+ .thenReturn(Collections.emptyList())
+ .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
+ JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
testedObjectInnerClassThread.run();
- ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
- verify(httpClientMock).execute(captor1.capture());
- assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
- .hasPath(
- "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
+ verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID);
+
verify(threadMockToNotifyCamundaFlow).run();
verify(executorMock).shutdown();
}
+
/**
* Test run method, where the are following conditions:
- * <p> - DmaapThreadListener is running, flag is set to true
- * <p> - map is filled with one entry with the correlationId that does not match to correlationId
- * taken from http response. run method should not do anything with the map not run any thread to notify camunda
- * process
+ * <p>
+ * - DmaapThreadListener is running, flag is set to true
+ * <p>
+ * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
+ * response. run method should not do anything with the map not run any thread to notify camunda process
*/
@Test
- public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
- throws IOException {
- when(httpClientMock.execute(any(HttpGet.class))).
- thenReturn(createResponse(
- String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
+ public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
+ when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList(
+ String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP),
+ JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
/**
* Test run method, where the are following conditions:
- * <p> - DmaapThreadListener is running, flag is set to true
- * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
- * run method should not do anything with the map and not run any thread to notify camunda process
+ * <p>
+ * - DmaapThreadListener is running, flag is set to true
+ * <p>
+ * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
+ * method should not do anything with the map and not run any thread to notify camunda process
*/
@Test
- public void correlationIdIsNotFoundInHttpResponse() throws IOException {
- when(httpClientMock.execute(any(HttpGet.class))).
- thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
+ public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
+ when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
+ .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
- Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
- httpClientField.setAccessible(true);
- httpClientField.set(testedObject, httpClientMock);
- httpClientField.setAccessible(false);
+ Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady");
+ consumerForPnfReadyField.setAccessible(true);
+ consumerForPnfReadyField.set(testedObject, kafkaConsumerMock);
+ consumerForPnfReadyField.setAccessible(false);
+
+ Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate");
+ consumerForPnfUpdateField.setAccessible(true);
+ consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock);
+ consumerForPnfUpdateField.setAccessible(false);
Field executorField = testedObject.getClass().getDeclaredField("executor");
executorField.setAccessible(true);
executorField.set(testedObject, executorMock);
executorField.setAccessible(false);
- Field pnfCorrelationToThreadMapField = testedObject.getClass()
- .getDeclaredField("pnfCorrelationIdToThreadMap");
+ Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
pnfCorrelationToThreadMapField.setAccessible(true);
Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
- pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
+ pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
threadRunFlag.setAccessible(false);
}
- private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
- HttpEntity entity = new StringEntity(json);
- ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
- HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
- response.setEntity(entity);
- response.setStatusCode(200);
- return response;
- }
-
}