2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2018 Nokia.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
26 import static org.junit.Assert.assertEquals;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verifyZeroInteractions;
33 import static org.mockito.Mockito.when;
34 import java.io.ByteArrayInputStream;
35 import java.io.IOException;
36 import java.lang.reflect.Field;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import org.apache.http.HttpEntity;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.ProtocolVersion;
43 import org.apache.http.client.HttpClient;
44 import org.apache.http.client.methods.HttpGet;
45 import org.apache.http.entity.InputStreamEntity;
46 import org.apache.http.message.BasicHttpResponse;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
54 import org.springframework.core.env.Environment;
56 @RunWith(MockitoJUnitRunner.class)
57 public class PnfEventReadyDmaapClientTest {
59 private static final String PNF_CORRELATION_ID = "corrTestId";
60 private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
61 private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\","
62 + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]";
64 private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]";
66 private static final String HOST = "hostTest";
67 private static final int PORT = 1234;
68 private static final String PROTOCOL = "http";
69 private static final String URI_PATH_PREFIX = "eventsForTesting";
70 private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
71 private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
72 private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
73 private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
74 private static final String CONSUMER_GROUP = "so-consumer";
75 private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
78 private Environment env;
79 private PnfEventReadyDmaapClient testedObject;
81 private DmaapTopicListenerThread testedObjectInnerClassThread;
82 private HttpClient httpClientMock;
83 private Runnable threadMockToNotifyCamundaFlow;
84 private ScheduledThreadPoolExecutor executorMock;
87 public void init() throws NoSuchFieldException, IllegalAccessException {
88 when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
89 when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
90 when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
91 when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
92 when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
93 when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
94 when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
95 when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
96 when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
97 when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
98 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
99 testedObject = new PnfEventReadyDmaapClient(env);
100 testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
101 httpClientMock = mock(HttpClient.class);
102 threadMockToNotifyCamundaFlow = mock(Runnable.class);
103 executorMock = mock(ScheduledThreadPoolExecutor.class);
108 * Test run method, where the are following conditions:
110 * - DmaapThreadListener is running, flag is set to true
112 * - map is filled with one entry with the key that we get from response
114 * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
115 * and shutdown the executor because of empty map
118 public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
119 when(httpClientMock.execute(any(HttpGet.class)))
120 .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
121 testedObjectInnerClassThread.run();
122 ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
123 verify(httpClientMock).execute(captor1.capture());
124 assertEquals(captor1.getValue().getURI().getHost(), HOST);
125 assertEquals(captor1.getValue().getURI().getPort(), PORT);
126 assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
127 assertEquals(captor1.getValue().getURI().getPath(),
128 "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + "");
130 verify(threadMockToNotifyCamundaFlow).run();
131 verify(executorMock).shutdown();
136 public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
137 ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
138 when(httpClientMock.execute(any(HttpGet.class)))
139 .thenReturn(createResponse_forReady(
140 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)))
141 .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
142 testedObjectInnerClassThread.run();
143 verify(httpClientMock, times(2)).execute(captor1.capture());
144 assertEquals(captor1.getValue().getURI().getHost(), HOST);
145 assertEquals(captor1.getValue().getURI().getPort(), PORT);
146 assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
148 assertEquals(captor1.getValue().getURI().getPath(),
149 "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
152 verify(threadMockToNotifyCamundaFlow).run();
153 verify(executorMock).shutdown();
158 * Test run method, where the are following conditions:
160 * - DmaapThreadListener is running, flag is set to true
162 * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
163 * response. run method should not do anything with the map not run any thread to notify camunda process
166 public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
167 when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse(
168 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP)));
169 testedObjectInnerClassThread.run();
170 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
174 * Test run method, where the are following conditions:
176 * - DmaapThreadListener is running, flag is set to true
178 * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
179 * method should not do anything with the map and not run any thread to notify camunda process
182 public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
183 when(httpClientMock.execute(any(HttpGet.class)))
184 .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
185 testedObjectInnerClassThread.run();
186 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
189 private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
190 Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
191 httpClientField.setAccessible(true);
192 httpClientField.set(testedObject, httpClientMock);
193 httpClientField.setAccessible(false);
195 Field executorField = testedObject.getClass().getDeclaredField("executor");
196 executorField.setAccessible(true);
197 executorField.set(testedObject, executorMock);
198 executorField.setAccessible(false);
200 Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
201 pnfCorrelationToThreadMapField.setAccessible(true);
202 Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
203 pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
204 pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
206 Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
207 threadRunFlag.setAccessible(true);
208 threadRunFlag.set(testedObject, true);
209 threadRunFlag.setAccessible(false);
212 private HttpResponse createResponse(String json) {
213 HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
214 ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
215 HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
216 response.setEntity(entity);
217 response.setStatusCode(200);
221 private HttpResponse createResponse_forReady(String json) {
222 HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
223 ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
224 HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
225 response.setEntity(entity);
226 response.setStatusCode(500);