Merge "bug fixing with reading dmaap message" into casablanca
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / dmaap / PnfEventReadyDmaapClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2018 Nokia.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
24
25 import static org.assertj.core.api.Assertions.assertThat;
26 import static org.mockito.Matchers.any;
27 import static org.mockito.Matchers.eq;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyZeroInteractions;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.io.UnsupportedEncodingException;
35 import java.lang.reflect.Field;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39
40 import org.apache.http.HttpEntity;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.ProtocolVersion;
43 import org.apache.http.client.HttpClient;
44 import org.apache.http.client.methods.HttpGet;
45 import org.apache.http.entity.StringEntity;
46 import org.apache.http.message.BasicHttpResponse;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.Mock;
52 import org.mockito.runners.MockitoJUnitRunner;
53 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
54 import org.springframework.core.env.Environment;
55 @RunWith(MockitoJUnitRunner.class)
56 public class PnfEventReadyDmaapClientTest {
57
58     private static final String CORRELATION_ID = "corrTestId";
59     private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
60     private static final String JSON_EXAMPLE_WITH_CORRELATION_ID = "[{\"correlationId\": \"%s\","
61             + "\"value\":\"value1\"},{\"correlationId\": \"corr\",\"value\":\"value2\"}]";
62
63     private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID = "[{\"key1\":\"value1\"}]";
64
65     private static final String HOST = "hostTest";
66     private static final int PORT = 1234;
67     private static final String PROTOCOL = "http";
68     private static final String URI_PATH_PREFIX = "eventsForTesting";
69     private static final String EVENT_TOPIC_TEST = "eventTopicTest";
70     private static final String CONSUMER_ID = "consumerTestId";
71     private static final String CONSUMER_GROUP = "consumerGroupTest";
72     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
73
74     @Mock
75     private Environment env;
76     private PnfEventReadyDmaapClient testedObject;
77
78     private DmaapTopicListenerThread testedObjectInnerClassThread;
79     private HttpClient httpClientMock;
80     private Runnable threadMockToNotifyCamundaFlow;
81     private ScheduledThreadPoolExecutor executorMock;
82
83     @Before
84     public void init() throws NoSuchFieldException, IllegalAccessException {
85         when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
86         when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
87         when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
88         when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
89         when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(EVENT_TOPIC_TEST);
90         when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
91         when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
92         when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
93                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
94         testedObject = new PnfEventReadyDmaapClient(env);
95         testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
96         httpClientMock = mock(HttpClient.class);
97         threadMockToNotifyCamundaFlow = mock(Runnable.class);
98         executorMock = mock(ScheduledThreadPoolExecutor.class);
99         setPrivateField();
100     }
101
102     /**
103      * Test run method, where the are following conditions:
104      * <p> - DmaapThreadListener is running, flag is set to true
105      * <p> - map is filled with one entry with the key that we get from response
106      * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is
107      * empty) and shutdown the executor because of empty map
108      */
109     @Test
110     public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
111             throws IOException {
112         when(httpClientMock.execute(any(HttpGet.class))).
113                 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
114         testedObjectInnerClassThread.run();
115         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
116         verify(httpClientMock).execute(captor1.capture());
117         assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
118                 .hasPath(
119                         "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
120         verify(threadMockToNotifyCamundaFlow).run();
121         verify(executorMock).shutdown();
122     }
123
124     /**
125      * Test run method, where the are following conditions:
126      * <p> - DmaapThreadListener is running, flag is set to true
127      * <p> - map is filled with one entry with the correlationId that does not match to correlationId
128      * taken from http response. run method should not do anything with the map not run any thread to notify camunda
129      * process
130      */
131     @Test
132     public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
133             throws IOException {
134         when(httpClientMock.execute(any(HttpGet.class))).
135                 thenReturn(createResponse(
136                         String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
137         testedObjectInnerClassThread.run();
138         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
139     }
140
141     /**
142      * Test run method, where the are following conditions:
143      * <p> - DmaapThreadListener is running, flag is set to true
144      * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
145      * run method should not do anything with the map and not run any thread to notify camunda process
146      */
147     @Test
148     public void correlationIdIsNotFoundInHttpResponse() throws IOException {
149         when(httpClientMock.execute(any(HttpGet.class))).
150                 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
151         testedObjectInnerClassThread.run();
152         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
153     }
154
155     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
156         Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
157         httpClientField.setAccessible(true);
158         httpClientField.set(testedObject, httpClientMock);
159         httpClientField.setAccessible(false);
160
161         Field executorField = testedObject.getClass().getDeclaredField("executor");
162         executorField.setAccessible(true);
163         executorField.set(testedObject, executorMock);
164         executorField.setAccessible(false);
165
166         Field pnfCorrelationToThreadMapField = testedObject.getClass()
167                 .getDeclaredField("pnfCorrelationIdToThreadMap");
168         pnfCorrelationToThreadMapField.setAccessible(true);
169         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
170         pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
171         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
172
173         Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
174         threadRunFlag.setAccessible(true);
175         threadRunFlag.set(testedObject, true);
176         threadRunFlag.setAccessible(false);
177     }
178
179     private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
180         HttpEntity entity = new StringEntity(json);
181         ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
182         HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
183         response.setEntity(entity);
184         response.setStatusCode(200);
185         return response;
186     }
187
188 }