2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2018 Nokia.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.infrastructure.pnf.dmaap;
26 import static org.junit.Assert.assertEquals;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyZeroInteractions;
32 import static org.mockito.Mockito.when;
33 import java.io.IOException;
34 import java.lang.reflect.Field;
35 import java.util.Arrays;
36 import java.util.Collections;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.runner.RunWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.MockitoJUnitRunner;
45 import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
46 import org.onap.so.client.kafka.KafkaConsumerImpl;
47 import org.springframework.core.env.Environment;
50 @RunWith(MockitoJUnitRunner.class)
51 public class PnfEventReadyDmaapClientTest {
52 private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
53 private static final String PNF_CORRELATION_ID = "corrTestId";
54 private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
55 private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID =
56 {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}",
57 "{\"correlationId\": \"corr\",\"value\":\"value2\"}"};
59 private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}";
60 private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
61 private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
62 private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
63 private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
64 private static final String CONSUMER_GROUP = "so-consumer";
65 private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
68 private Environment env;
69 private PnfEventReadyDmaapClient testedObject;
71 private DmaapTopicListenerThread testedObjectInnerClassThread;
72 private KafkaConsumerImpl kafkaConsumerMock;
73 private Runnable threadMockToNotifyCamundaFlow;
74 private ScheduledThreadPoolExecutor executorMock;
77 public void init() throws NoSuchFieldException, IllegalAccessException, IOException {
78 when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS);
79 when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
80 when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
81 when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
82 when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
83 when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
84 when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
85 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
86 testedObject = new PnfEventReadyDmaapClient(env);
87 testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
88 kafkaConsumerMock = mock(KafkaConsumerImpl.class);
89 threadMockToNotifyCamundaFlow = mock(Runnable.class);
90 executorMock = mock(ScheduledThreadPoolExecutor.class);
95 * Test run method, where the are following conditions:
97 * - DmaapThreadListener is running, flag is set to true
99 * - map is filled with one entry with the key that we get from response
101 * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
102 * and shutdown the executor because of empty map
105 public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
106 when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
107 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
108 JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
109 testedObjectInnerClassThread.run();
110 verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE);
111 verify(threadMockToNotifyCamundaFlow).run();
112 verify(executorMock).shutdown();
117 public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
118 when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
119 .thenReturn(Collections.emptyList())
120 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
121 JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
122 testedObjectInnerClassThread.run();
123 verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID);
125 verify(threadMockToNotifyCamundaFlow).run();
126 verify(executorMock).shutdown();
131 * Test run method, where the are following conditions:
133 * - DmaapThreadListener is running, flag is set to true
135 * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
136 * response. run method should not do anything with the map not run any thread to notify camunda process
139 public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
140 when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList(
141 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP),
142 JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
143 testedObjectInnerClassThread.run();
144 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
148 * Test run method, where the are following conditions:
150 * - DmaapThreadListener is running, flag is set to true
152 * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
153 * method should not do anything with the map and not run any thread to notify camunda process
156 public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
157 when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
158 .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
159 testedObjectInnerClassThread.run();
160 verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
163 private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
164 Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady");
165 consumerForPnfReadyField.setAccessible(true);
166 consumerForPnfReadyField.set(testedObject, kafkaConsumerMock);
167 consumerForPnfReadyField.setAccessible(false);
169 Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate");
170 consumerForPnfUpdateField.setAccessible(true);
171 consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock);
172 consumerForPnfUpdateField.setAccessible(false);
174 Field executorField = testedObject.getClass().getDeclaredField("executor");
175 executorField.setAccessible(true);
176 executorField.set(testedObject, executorMock);
177 executorField.setAccessible(false);
179 Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
180 pnfCorrelationToThreadMapField.setAccessible(true);
181 Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
182 pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
183 pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
185 Field threadRunFlag = testedObject.getClass().getDeclaredField("dmaapThreadListenerIsRunning");
186 threadRunFlag.setAccessible(true);
187 threadRunFlag.set(testedObject, true);
188 threadRunFlag.setAccessible(false);