c870762cdf76410878aadd4c09095888ac66cc62
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2018 Nokia.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.infrastructure.pnf.kafka;
24
25
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.eq;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyZeroInteractions;
31 import static org.mockito.Mockito.when;
32 import java.io.IOException;
33 import java.lang.reflect.Field;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.junit.runner.RunWith;
42 import org.mockito.Mock;
43 import org.mockito.junit.MockitoJUnitRunner;
44 import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread;
45 import org.onap.so.client.kafka.KafkaConsumerImpl;
46 import org.springframework.core.env.Environment;
47
48
49 @RunWith(MockitoJUnitRunner.class)
50 public class PnfEventReadyKafkaClientTest {
51     private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
52     private static final String PNF_CORRELATION_ID = "corrTestId";
53     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
54     private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID =
55             {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}",
56                     "{\"correlationId\": \"corr\",\"value\":\"value2\"}"};
57
58     private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}";
59     private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
60     private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
61     private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
62     private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
63     private static final String CONSUMER_GROUP = "so-consumer";
64     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
65
66     @Mock
67     private Environment env;
68     private PnfEventReadyKafkaClient testedObject;
69
70     private KafkaTopicListenerThread testedObjectInnerClassThread;
71     private KafkaConsumerImpl kafkaConsumerMock;
72     private Runnable threadMockToNotifyCamundaFlow;
73     private ScheduledThreadPoolExecutor executorMock;
74
75     @Before
76     public void init() throws NoSuchFieldException, IllegalAccessException, IOException {
77         when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS);
78         when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
79         when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
80         when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
81         when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
82         when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
83         when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class)))
84                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
85         testedObject = new PnfEventReadyKafkaClient(env);
86         testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread();
87         kafkaConsumerMock = mock(KafkaConsumerImpl.class);
88         threadMockToNotifyCamundaFlow = mock(Runnable.class);
89         executorMock = mock(ScheduledThreadPoolExecutor.class);
90         setPrivateField();
91     }
92
93     /**
94      * Test run method, where the are following conditions:
95      * <p>
96      * - KafkaThreadListener is running, flag is set to true
97      * <p>
98      * - map is filled with one entry with the key that we get from response
99      * <p>
100      * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
101      * and shutdown the executor because of empty map
102      */
103     @Test
104     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
105         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
106                 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
107                         JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
108         testedObjectInnerClassThread.run();
109         verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE);
110         verify(threadMockToNotifyCamundaFlow).run();
111         verify(executorMock).shutdown();
112     }
113
114
115     @Test
116     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
117         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
118                 .thenReturn(Collections.emptyList())
119                 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
120                         JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
121         testedObjectInnerClassThread.run();
122         verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID);
123
124         verify(threadMockToNotifyCamundaFlow).run();
125         verify(executorMock).shutdown();
126     }
127
128
129     /**
130      * Test run method, where the are following conditions:
131      * <p>
132      * - KafkaThreadListener is running, flag is set to true
133      * <p>
134      * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
135      * response. run method should not do anything with the map not run any thread to notify camunda process
136      */
137     @Test
138     public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
139         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList(
140                 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP),
141                 JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
142         testedObjectInnerClassThread.run();
143         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
144     }
145
146     /**
147      * Test run method, where the are following conditions:
148      * <p>
149      * - KafkaThreadListener is running, flag is set to true
150      * <p>
151      * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
152      * method should not do anything with the map and not run any thread to notify camunda process
153      */
154     @Test
155     public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
156         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
157                 .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
158         testedObjectInnerClassThread.run();
159         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
160     }
161
162     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
163         Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady");
164         consumerForPnfReadyField.setAccessible(true);
165         consumerForPnfReadyField.set(testedObject, kafkaConsumerMock);
166         consumerForPnfReadyField.setAccessible(false);
167
168         Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate");
169         consumerForPnfUpdateField.setAccessible(true);
170         consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock);
171         consumerForPnfUpdateField.setAccessible(false);
172
173         Field executorField = testedObject.getClass().getDeclaredField("executor");
174         executorField.setAccessible(true);
175         executorField.set(testedObject, executorMock);
176         executorField.setAccessible(false);
177
178         Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
179         pnfCorrelationToThreadMapField.setAccessible(true);
180         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
181         pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
182         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
183
184         Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning");
185         threadRunFlag.setAccessible(true);
186         threadRunFlag.set(testedObject, true);
187         threadRunFlag.setAccessible(false);
188     }
189
190 }