Merge "Support for SO to ExtAPI"
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2018 Nokia.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
24
25
26 import static org.junit.Assert.*;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyObject;
29 import static org.mockito.ArgumentMatchers.eq;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.times;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyZeroInteractions;
34 import static org.mockito.Mockito.when;
35 import java.io.IOException;
36 import java.io.UnsupportedEncodingException;
37 import java.lang.reflect.Field;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ScheduledThreadPoolExecutor;
41 import org.apache.http.HttpEntity;
42 import org.apache.http.HttpResponse;
43 import org.apache.http.ProtocolVersion;
44 import org.apache.http.client.HttpClient;
45 import org.apache.http.client.methods.HttpGet;
46 import org.apache.http.entity.StringEntity;
47 import org.apache.http.message.BasicHttpResponse;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.junit.runner.RunWith;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.InjectMocks;
53 import org.mockito.Mock;
54 import org.mockito.junit.MockitoJUnitRunner;
55 import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
56 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
57 import org.springframework.context.ApplicationEventPublisher;
58 import org.springframework.core.env.Environment;
59
60 @RunWith(MockitoJUnitRunner.class)
61 public class PnfEventReadyDmaapClientTest {
62
63     private static final String PNF_CORRELATION_ID = "corrTestId";
64     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
65     private static final String JSON_EXAMPLE_WITH_PNF_CORRELATION_ID = "[{\"correlationId\": \"%s\","
66             + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]";
67
68     private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "[{\"key1\":\"value1\"}]";
69
70     private static final String HOST = "hostTest";
71     private static final int PORT = 1234;
72     private static final String PROTOCOL = "http";
73     private static final String URI_PATH_PREFIX = "eventsForTesting";
74     private static final String EVENT_TOPIC_TEST = "eventTopicTest";
75     private static final String CONSUMER_ID = "consumerTestId";
76     private static final String CONSUMER_GROUP = "consumerGroupTest";
77     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
78
79     @Mock
80     private Environment env;
81     private PnfEventReadyDmaapClient testedObject;
82
83     private DmaapTopicListenerThread testedObjectInnerClassThread;
84     private HttpClient httpClientMock;
85     private Runnable threadMockToNotifyCamundaFlow;
86     private ScheduledThreadPoolExecutor executorMock;
87
88     @Mock
89     private ApplicationEventPublisher applicationEventPublisher;
90
91     @Before
92     public void init() throws NoSuchFieldException, IllegalAccessException {
93         when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
94         when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
95         when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
96         when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
97         when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(EVENT_TOPIC_TEST);
98         when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
99         when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
100         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
101                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
102         testedObject = new PnfEventReadyDmaapClient(env, applicationEventPublisher);
103         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
104         httpClientMock = mock(HttpClient.class);
105         threadMockToNotifyCamundaFlow = mock(Runnable.class);
106         executorMock = mock(ScheduledThreadPoolExecutor.class);
107         setPrivateField();
108     }
109
110     /**
111      * Test run method, where the are following conditions:
112      * <p>
113      * - DmaapThreadListener is running, flag is set to true
114      * <p>
115      * - map is filled with one entry with the key that we get from response
116      * <p>
117      * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
118      * and shutdown the executor because of empty map
119      */
120     @Test
121     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
122         when(httpClientMock.execute(any(HttpGet.class)))
123                 .thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID)));
124         testedObjectInnerClassThread.run();
125         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
126         verify(httpClientMock).execute(captor1.capture());
127
128         assertEquals(captor1.getValue().getURI().getHost(), HOST);
129         assertEquals(captor1.getValue().getURI().getPort(), PORT);
130         assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
131         assertEquals(captor1.getValue().getURI().getPath(),
132                 "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
133
134         /**
135          * Two PNF returned from HTTP request.
136          */
137         verify(applicationEventPublisher, times(2)).publishEvent(any(PnfNotificationEvent.class));
138         verify(executorMock).shutdown();
139     }
140
141     /**
142      * Test run method, where the are following conditions:
143      * <p>
144      * - DmaapThreadListener is running, flag is set to true
145      * <p>
146      * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
147      * response. run method should not do anything with the map not run any thread to notify camunda process
148      */
149     @Test
150     public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
151         when(httpClientMock.execute(any(HttpGet.class))).thenReturn(createResponse(
152                 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID, PNF_CORRELATION_ID_NOT_FOUND_IN_MAP)));
153         testedObjectInnerClassThread.run();
154         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
155     }
156
157     /**
158      * Test run method, where the are following conditions:
159      * <p>
160      * - DmaapThreadListener is running, flag is set to true
161      * <p>
162      * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
163      * method should not do anything with the map and not run any thread to notify camunda process
164      */
165     @Test
166     public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
167         when(httpClientMock.execute(any(HttpGet.class)))
168                 .thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
169         testedObjectInnerClassThread.run();
170         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
171     }
172
173     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
174         Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
175         httpClientField.setAccessible(true);
176         httpClientField.set(testedObject, httpClientMock);
177         httpClientField.setAccessible(false);
178
179         Field executorField = testedObject.getClass().getDeclaredField("executor");
180         executorField.setAccessible(true);
181         executorField.set(testedObject, executorMock);
182         executorField.setAccessible(false);
183
184         Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
185         pnfCorrelationToThreadMapField.setAccessible(true);
186         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
187         pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
188         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
189
190         Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
191         threadRunFlag.setAccessible(true);
192         threadRunFlag.set(testedObject, true);
193         threadRunFlag.setAccessible(false);
194     }
195
196     private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
197         HttpEntity entity = new StringEntity(json);
198         ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
199         HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
200         response.setEntity(entity);
201         response.setStatusCode(200);
202         return response;
203     }
204
205 }