2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
24 import static org.junit.Assert.*;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Matchers.eq;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
32 import java.io.IOException;
33 import java.io.UnsupportedEncodingException;
34 import java.lang.reflect.Field;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import org.apache.http.HttpEntity;
40 import org.apache.http.HttpResponse;
41 import org.apache.http.ProtocolVersion;
42 import org.apache.http.client.HttpClient;
43 import org.apache.http.client.methods.HttpGet;
44 import org.apache.http.entity.StringEntity;
45 import org.apache.http.message.BasicHttpResponse;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
53 import org.springframework.core.env.Environment;
54 @RunWith(MockitoJUnitRunner.class)
55 public class PnfEventReadyDmaapClientTest {
57 private static final String CORRELATION_ID = "corrTestId";
58 private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
59 private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = "[\n"
61 + " \"pnfRegistrationFields\" : {\n"
62 + " \"correlationId\" : \"%s\",\n"
63 + " \"value\" : \"value1\"\n"
67 + " \"pnfRegistrationFields\" : {\n"
68 + " \"correlationId\" : \"corr\",\n"
69 + " \"value\" : \"value2\"\n"
73 private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
74 "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
76 private static final String HOST = "hostTest";
77 private static final int PORT = 1234;
78 private static final String PROTOCOL = "http";
79 private static final String URI_PATH_PREFIX = "eventsForTesting";
80 private static final String EVENT_TOPIC_TEST = "eventTopicTest";
81 private static final String CONSUMER_ID = "consumerTestId";
82 private static final String CONSUMER_GROUP = "consumerGroupTest";
83 private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
86 private Environment env;
87 private PnfEventReadyDmaapClient testedObject;
89 private DmaapTopicListenerThread testedObjectInnerClassThread;
90 private HttpClient httpClientMock;
91 private Runnable threadMockToNotifyCamundaFlow;
92 private ScheduledThreadPoolExecutor executorMock;
95 public void init() throws NoSuchFieldException, IllegalAccessException {
96 when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
97 when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
98 when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
99 when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
100 when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(EVENT_TOPIC_TEST);
101 when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
102 when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
103 when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
104 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
105 testedObject = new PnfEventReadyDmaapClient(env);
106 testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
107 httpClientMock = mock(HttpClient.class);
108 threadMockToNotifyCamundaFlow = mock(Runnable.class);
109 executorMock = mock(ScheduledThreadPoolExecutor.class);
114 * Test run method, where the are following conditions:
115 * <p> - DmaapThreadListener is running, flag is set to true
116 * <p> - map is filled with one entry with the key that we get from response
117 * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is
118 * empty) and shutdown the executor because of empty map
121 public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
123 when(httpClientMock.execute(any(HttpGet.class))).
124 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
125 testedObjectInnerClassThread.run();
126 ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
127 verify(httpClientMock).execute(captor1.capture());
129 assertEquals(captor1.getValue().getURI().getHost(),HOST);
130 assertEquals(captor1.getValue().getURI().getPort(),PORT);
131 assertEquals(captor1.getValue().getURI().getScheme(),PROTOCOL);
132 assertEquals(captor1.getValue().getURI().getPath(),"/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
134 verify(threadMockToNotifyCamundaFlow).run();
135 verify(executorMock).shutdown();
139 * Test run method, where the are following conditions:
140 * <p> - DmaapThreadListener is running, flag is set to true
141 * <p> - map is filled with one entry with the correlationId that does not match to correlationId
142 * taken from http response. run method should not do anything with the map not run any thread to notify camunda
146 public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
148 when(httpClientMock.execute(any(HttpGet.class))).
149 thenReturn(createResponse(
150 String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
151 testedObjectInnerClassThread.run();
152 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
156 * Test run method, where the are following conditions:
157 * <p> - DmaapThreadListener is running, flag is set to true
158 * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
159 * run method should not do anything with the map and not run any thread to notify camunda process
162 public void correlationIdIsNotFoundInHttpResponse() throws IOException {
163 when(httpClientMock.execute(any(HttpGet.class))).
164 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
165 testedObjectInnerClassThread.run();
166 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
169 private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
170 Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
171 httpClientField.setAccessible(true);
172 httpClientField.set(testedObject, httpClientMock);
173 httpClientField.setAccessible(false);
175 Field executorField = testedObject.getClass().getDeclaredField("executor");
176 executorField.setAccessible(true);
177 executorField.set(testedObject, executorMock);
178 executorField.setAccessible(false);
180 Field pnfCorrelationToThreadMapField = testedObject.getClass()
181 .getDeclaredField("pnfCorrelationIdToThreadMap");
182 pnfCorrelationToThreadMapField.setAccessible(true);
183 Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
184 pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
185 pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
187 Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
188 threadRunFlag.setAccessible(true);
189 threadRunFlag.set(testedObject, true);
190 threadRunFlag.setAccessible(false);
193 private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
194 HttpEntity entity = new StringEntity(json);
195 ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
196 HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
197 response.setEntity(entity);
198 response.setStatusCode(200);