2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyZeroInteractions;
28 import static org.mockito.Mockito.when;
30 import java.io.IOException;
31 import java.io.UnsupportedEncodingException;
32 import java.lang.reflect.Field;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ScheduledExecutorService;
36 import org.apache.http.HttpEntity;
37 import org.apache.http.HttpResponse;
38 import org.apache.http.ProtocolVersion;
39 import org.apache.http.client.HttpClient;
40 import org.apache.http.client.methods.HttpGet;
41 import org.apache.http.entity.StringEntity;
42 import org.apache.http.message.BasicHttpResponse;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.ArgumentCaptor;
47 public class PnfEventReadyConsumerTest {
49 private static final String CORRELATION_ID = "corrTestId";
50 private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
51 private static final String JSON_EXAMPLE_WITH_CORRELATION_ID =
52 "{\"pnfRegistrationFields\":{\"correlationId\":\"%s\"}}";
53 private static final String JSON_EXAMPLE_WITH_NO_CORRELATION_ID =
54 "{\"pnfRegistrationFields\":{\"field\":\"value\"}}";
56 private static final String HOST = "hostTest";
57 private static final int PORT = 1234;
58 private static final String PROTOCOL = "http";
59 private static final String URI_PATH_PREFIX = "eventsForTesting";
60 private static final String EVENT_TOPIC_TEST = "eventTopicTest";
61 private static final String CONSUMER_ID = "consumerTestId";
62 private static final String CONSUMER_GROUP = "consumerGroupTest";
64 private PnfEventReadyConsumer testedObject;
65 private HttpClient httpClientMock;
66 private Runnable threadMockToNotifyCamundaFlow;
67 private ScheduledExecutorService executorMock;
70 public void init() throws NoSuchFieldException, IllegalAccessException {
71 testedObject = new PnfEventReadyConsumer();
72 testedObject.setDmaapHost(HOST);
73 testedObject.setDmaapPort(PORT);
74 testedObject.setDmaapProtocol(PROTOCOL);
75 testedObject.setDmaapUriPathPrefix(URI_PATH_PREFIX);
76 testedObject.setDmaapTopicName(EVENT_TOPIC_TEST);
77 testedObject.setConsumerId(CONSUMER_ID);
78 testedObject.setConsumerGroup(CONSUMER_GROUP);
79 testedObject.setDmaapClientDelayInSeconds(1);
81 httpClientMock = mock(HttpClient.class);
82 threadMockToNotifyCamundaFlow = mock(Runnable.class);
83 executorMock = mock(ScheduledExecutorService.class);
88 * Test run method, where the are following conditions:
89 * <p> - DmaapThreadListener is running, flag is set to true
90 * <p> - map is filled with one entry with the key that we get from response
91 * <p> run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
92 * and shutdown the executor because of empty map
95 public void correlationIdIsFoundInHttpResponse_notifyAboutPnfReady()
97 when(httpClientMock.execute(any(HttpGet.class))).
98 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
99 testedObject.sendRequest();
100 ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
101 verify(httpClientMock).execute(captor1.capture());
102 assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
104 "/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
105 verify(threadMockToNotifyCamundaFlow).run();
106 verify(executorMock).shutdownNow();
110 * Test run method, where the are following conditions:
111 * <p> - DmaapThreadListener is running, flag is set to true
112 * <p> - map is filled with one entry with the correlationId that does not match to correlationId
113 * taken from http response. run method should not do anything with the map not run any thread to
114 * notify camunda process
117 public void correlationIdIsFoundInHttpResponse_NotFoundInMap()
119 when(httpClientMock.execute(any(HttpGet.class))).
120 thenReturn(createResponse(
121 String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
122 testedObject.sendRequest();
123 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
127 * Test run method, where the are following conditions:
128 * <p> - DmaapThreadListener is running, flag is set to true
129 * <p> - map is filled with one entry with the correlationId but no correlation id is taken from HttpResponse
130 * run method should not do anything with the map and not run any thread to notify camunda process
133 public void correlationIdIsNotFoundInHttpResponse() throws IOException {
134 when(httpClientMock.execute(any(HttpGet.class))).
135 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
136 testedObject.sendRequest();
137 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
140 private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
141 Field httpClientField = testedObject.getClass().getDeclaredField("httpClient");
142 httpClientField.setAccessible(true);
143 httpClientField.set(testedObject, httpClientMock);
144 httpClientField.setAccessible(false);
146 Field executorField = testedObject.getClass().getDeclaredField("executor");
147 executorField.setAccessible(true);
148 executorField.set(testedObject, executorMock);
149 executorField.setAccessible(false);
151 Field pnfCorrelationToThreadMapField = testedObject.getClass()
152 .getDeclaredField("pnfCorrelationIdToThreadMap");
153 pnfCorrelationToThreadMapField.setAccessible(true);
154 Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
155 pnfCorrelationToThreadMap.put(CORRELATION_ID, threadMockToNotifyCamundaFlow);
156 pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
158 Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
159 threadRunFlag.setAccessible(true);
160 threadRunFlag.set(testedObject, true);
161 threadRunFlag.setAccessible(false);
164 private HttpResponse createResponse(String json) throws UnsupportedEncodingException {
165 HttpEntity entity = new StringEntity(json);
166 ProtocolVersion protocolVersion = new ProtocolVersion("", 1, 1);
167 HttpResponse response = new BasicHttpResponse(protocolVersion, 1, "");
168 response.setEntity(entity);
169 response.setStatusCode(200);