PNF-READY Events not received by DmaapListener in TestScenarios
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2018 Nokia.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
24
25
26 import static org.junit.Assert.assertEquals;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verifyZeroInteractions;
33 import static org.mockito.Mockito.when;
34 import java.io.ByteArrayInputStream;
35 import java.io.IOException;
36 import java.lang.reflect.Field;
37 import java.util.Map;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import org.apache.http.HttpEntity;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.ProtocolVersion;
43 import org.apache.http.client.HttpClient;
44 import org.apache.http.client.methods.HttpGet;
45 import org.apache.http.entity.InputStreamEntity;
46 import org.apache.http.message.BasicHttpResponse;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
54 import org.springframework.core.env.Environment;
55
56 @RunWith(MockitoJUnitRunner.class)
57 public class PnfEventReadyDmaapClientTest {
58
59     private static final String PNF_CORRELATION_ID = "corrTestId";
60     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
61     private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\","
62             + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]";
63
64     private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]";
65
66     private static final String HOST = "hostTest";
67     private static final int PORT = 1234;
68     private static final String PROTOCOL = "http";
69     private static final String URI_PATH_PREFIX = "eventsForTesting";
70     private static final String TOPIC_NAME = "PNF_READY_Test PNF_UPDATE_Test";
71     private static final String CONSUMER_ID = "consumerTestId";
72     private static final String CONSUMER_GROUP = "consumerGroupTest";
73     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
74
75     @Mock
76     private Environment env;
77     private PnfEventReadyDmaapClient testedObject;
78
79     private DmaapTopicListenerThread testedObjectInnerClassThread;
80     private HttpClient httpClientMock;
81     private Runnable threadMockToNotifyCamundaFlow;
82     private ScheduledThreadPoolExecutor executorMock;
83
84     @Before
85     public void init() throws NoSuchFieldException, IllegalAccessException {
86         when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
87         when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
88         when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
89         when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
90         when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(TOPIC_NAME);
91         when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
92         when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
93         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
94                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
95         testedObject = new PnfEventReadyDmaapClient(env);
96         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
97         httpClientMock = mock(HttpClient.class);
98         threadMockToNotifyCamundaFlow = mock(Runnable.class);
99         executorMock = mock(ScheduledThreadPoolExecutor.class);
100         setPrivateField();
101     }
102
103     /**
104      * Test run method, where the are following conditions:
105      * <p>
106      * - DmaapThreadListener is running, flag is set to true
107      * <p>
108      * - map is filled with one entry with the key that we get from response
109      * <p>
110      * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
111      * and shutdown the executor because of empty map
112      */
113     @Test
114     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
115         when(httpClientMock.execute(any(HttpGet.class)))
116                 .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
117         testedObjectInnerClassThread.run();
118         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
119         verify(httpClientMock).execute(captor1.capture());
120         assertEquals(captor1.getValue().getURI().getHost(), HOST);
121         assertEquals(captor1.getValue().getURI().getPort(), PORT);
122         assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
123         String[] topic = TOPIC_NAME.split("\\s");
124         String pnf_update = null;
125         for (String t : topic) {
126             if (t.matches("(.*)PNF_UPDATE(.*)")) {
127                 pnf_update = t;
128                 assertEquals(captor1.getValue().getURI().getPath(),
129                         "/" + URI_PATH_PREFIX + "/" + pnf_update + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
130             }
131         }
132         verify(threadMockToNotifyCamundaFlow).run();
133         verify(executorMock).shutdown();
134     }
135
136
137     @Test
138     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
139         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
140         when(httpClientMock.execute(any(HttpGet.class)))
141                 .thenReturn(createResponse_forReady(
142                         String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)))
143                 .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
144         testedObjectInnerClassThread.run();
145         verify(httpClientMock, times(2)).execute(captor1.capture());
146         assertEquals(captor1.getValue().getURI().getHost(), HOST);
147         assertEquals(captor1.getValue().getURI().getPort(), PORT);
148         assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
149         String[] topic = TOPIC_NAME.split("\\s");
150         String pnf_ready = null;
151         for (String t : topic) {
152             if (t.matches("(.*)PNF_READY(.*)")) {
153                 pnf_ready = t;
154                 assertEquals(captor1.getValue().getURI().getPath(),
155                         "/" + URI_PATH_PREFIX + "/" + pnf_ready + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
156             }
157         }
158         verify(threadMockToNotifyCamundaFlow).run();
159         verify(executorMock).shutdown();
160     }
161
162
163     /**
164      * Test run method, where the are following conditions:
165      * <p>
166      * - DmaapThreadListener is running, flag is set to true
167      * <p>
168      * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
169      * response. run method should not do anything with the map not run any thread to notify camunda process
170      */
171     @Test
172     public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
173         when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse(
174                 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP)));
175         testedObjectInnerClassThread.run();
176         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
177     }
178
179     /**
180      * Test run method, where the are following conditions:
181      * <p>
182      * - DmaapThreadListener is running, flag is set to true
183      * <p>
184      * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
185      * method should not do anything with the map and not run any thread to notify camunda process
186      */
187     @Test
188     public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
189         when(httpClientMock.execute(any(HttpGet.class)))
190                 .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
191         testedObjectInnerClassThread.run();
192         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
193     }
194
195     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
196         Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
197         httpClientField.setAccessible(true);
198         httpClientField.set(testedObject, httpClientMock);
199         httpClientField.setAccessible(false);
200
201         Field executorField = testedObject.getClass().getDeclaredField("executor");
202         executorField.setAccessible(true);
203         executorField.set(testedObject, executorMock);
204         executorField.setAccessible(false);
205
206         Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
207         pnfCorrelationToThreadMapField.setAccessible(true);
208         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
209         pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
210         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
211
212         Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
213         threadRunFlag.setAccessible(true);
214         threadRunFlag.set(testedObject, true);
215         threadRunFlag.setAccessible(false);
216     }
217
218     private HttpResponse createResponse(String json) {
219         HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
220         ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
221         HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
222         response.setEntity(entity);
223         response.setStatusCode(200);
224         return response;
225     }
226
227     private HttpResponse createResponse_forReady(String json) {
228         HttpEntity entity = new InputStreamEntity(new ByteArrayInputStream(json.getBytes()));
229         ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
230         HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
231         response.setEntity(entity);
232         response.setStatusCode(500);
233         return response;
234     }
235
236 }
237