aab01c0a3eaa71a8d413a5933ce0e095149b74b8
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Matchers.eq;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyZeroInteractions;
29 import static org.mockito.Mockito.when;
30
31 import java.io.IOException;
32 import java.io.UnsupportedEncodingException;
33 import java.lang.reflect.Field;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ScheduledThreadPoolExecutor;
37
38 import org.apache.http.HttpEntity;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.ProtocolVersion;
41 import org.apache.http.client.HttpClient;
42 import org.apache.http.client.methods.HttpGet;
43 import org.apache.http.entity.StringEntity;
44 import org.apache.http.message.BasicHttpResponse;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Mock;
50 import org.mockito.runners.MockitoJUnitRunner;
51 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
52 import org.springframework.core.env.Environment;
53 @RunWith(MockitoJUnitRunner.class)
54 public class PnfEventReadyDmaapClientTest {
55
56     private static final String CORRELATION_ID = "corrTestId";
57     private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
58     private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = "[\n"
59             + "    {\n"
60             + "        \"pnfRegistrationFields\" : {\n"
61             + "        \"correlationId\" : \"%s\",\n"
62             + "        \"value\" : \"value1\"\n"
63             + "        }\n"
64             + "    },\n"
65             + "    {\n"
66             + "        \"pnfRegistrationFields\" : {\n"
67             + "        \"correlationId\" : \"corr\",\n"
68             + "        \"value\" : \"value2\"\n"
69             + "        }\n"
70             + "    }\n"
71             + "]";
72     private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
73             "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
74
75     private static final String HOST = "hostTest";
76     private static final int PORT = 1234;
77     private static final String PROTOCOL = "http";
78     private static final String URI_PATH_PREFIX = "eventsForTesting";
79     private static final String EVENT_TOPIC_TEST = "eventTopicTest";
80     private static final String CONSUMER_ID = "consumerTestId";
81     private static final String CONSUMER_GROUP = "consumerGroupTest";
82     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
83
84     @Mock
85     private Environment env;
86     private PnfEventReadyDmaapClient testedObject;
87
88     private DmaapTopicListenerThread testedObjectInnerClassThread;
89     private HttpClient httpClientMock;
90     private Runnable threadMockToNotifyCamundaFlow;
91     private ScheduledThreadPoolExecutor executorMock;
92
93     @Before
94     public void init() throws NoSuchFieldException, IllegalAccessException {
95         when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
96         when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
97         when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
98         when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
99         when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(EVENT_TOPIC_TEST);
100         when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
101         when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
102         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
103                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
104         testedObject = new PnfEventReadyDmaapClient(env);
105         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
106         httpClientMock = mock(HttpClient.class);
107         threadMockToNotifyCamundaFlow = mock(Runnable.class);
108         executorMock = mock(ScheduledThreadPoolExecutor.class);
109         setPrivateField();
110     }
111
112     /**
113      * Test run method, where the are following conditions:
114      * <p> - DmaapThreadListener is running, flag is set to true
115      * <p> - map is filled with one entry with the key that we get from response
116      * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is
117      * empty) and shutdown the executor because of empty map
118      */
119     @Test
120     public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
121             throws IOException {
122         when(httpClientMock.execute(any(HttpGet.class))).
123                 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
124         testedObjectInnerClassThread.run();
125         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
126         verify(httpClientMock).execute(captor1.capture());
127         assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
128                 .hasPath(
129                         "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
130         verify(threadMockToNotifyCamundaFlow).run();
131         verify(executorMock).shutdown();
132     }
133
134     /**
135      * Test run method, where the are following conditions:
136      * <p> - DmaapThreadListener is running, flag is set to true
137      * <p> - map is filled with one entry with the correlationId that does not match to correlationId
138      * taken from http response. run method should not do anything with the map not run any thread to notify camunda
139      * process
140      */
141     @Test
142     public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
143             throws IOException {
144         when(httpClientMock.execute(any(HttpGet.class))).
145                 thenReturn(createResponse(
146                         String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
147         testedObjectInnerClassThread.run();
148         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
149     }
150
151     /**
152      * Test run method, where the are following conditions:
153      * <p> - DmaapThreadListener is running, flag is set to true
154      * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
155      * run method should not do anything with the map and not run any thread to notify camunda process
156      */
157     @Test
158     public void correlationIdIsNotFoundInHttpResponse() throws IOException {
159         when(httpClientMock.execute(any(HttpGet.class))).
160                 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
161         testedObjectInnerClassThread.run();
162         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
163     }
164
165     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
166         Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
167         httpClientField.setAccessible(true);
168         httpClientField.set(testedObject, httpClientMock);
169         httpClientField.setAccessible(false);
170
171         Field executorField = testedObject.getClass().getDeclaredField("executor");
172         executorField.setAccessible(true);
173         executorField.set(testedObject, executorMock);
174         executorField.setAccessible(false);
175
176         Field pnfCorrelationToThreadMapField = testedObject.getClass()
177                 .getDeclaredField("pnfCorrelationIdToThreadMap");
178         pnfCorrelationToThreadMapField.setAccessible(true);
179         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
180         pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
181         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
182
183         Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
184         threadRunFlag.setAccessible(true);
185         threadRunFlag.set(testedObject, true);
186         threadRunFlag.setAccessible(false);
187     }
188
189     private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
190         HttpEntity entity = new StringEntity(json);
191         ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
192         HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
193         response.setEntity(entity);
194         response.setStatusCode(200);
195         return response;
196     }
197
198 }