update link to upper-constraints.txt
[so.git] / bpmn / so-bpmn-infrastructure-common / src / test / java / org / onap / so / bpmn / infrastructure / pnf / kafka / PnfEventReadyKafkaClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2018 Nokia.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.infrastructure.pnf.kafka;
24
25
26 import static org.junit.Assert.assertEquals;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.eq;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyZeroInteractions;
32 import static org.mockito.Mockito.when;
33 import java.io.IOException;
34 import java.lang.reflect.Field;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.Map;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.runner.RunWith;
43 import org.mockito.Mock;
44 import org.mockito.junit.MockitoJUnitRunner;
45 import org.onap.so.bpmn.infrastructure.pnf.kafka.PnfEventReadyKafkaClient.KafkaTopicListenerThread;
46 import org.onap.so.client.kafka.KafkaConsumerImpl;
47 import org.springframework.core.env.Environment;
48
49
50 @RunWith(MockitoJUnitRunner.class)
51 public class PnfEventReadyKafkaClientTest {
52     private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
53     private static final String PNF_CORRELATION_ID = "corrTestId";
54     private static final String PNF_CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
55     private static final String[] JSON_EXAMPLE_WITH_PNF_CORRELATION_ID =
56             {"{\"correlationId\": \"%s\"," + "\"value\":\"value1\"}",
57                     "{\"correlationId\": \"corr\",\"value\":\"value2\"}"};
58
59     private static final String JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID = "{\"key1\":\"value1\"}";
60     private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
61     private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
62     private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
63     private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
64     private static final String CONSUMER_GROUP = "so-consumer";
65     private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
66
67     @Mock
68     private Environment env;
69     private PnfEventReadyKafkaClient testedObject;
70
71     private KafkaTopicListenerThread testedObjectInnerClassThread;
72     private KafkaConsumerImpl kafkaConsumerMock;
73     private Runnable threadMockToNotifyCamundaFlow;
74     private ScheduledThreadPoolExecutor executorMock;
75
76     @Before
77     public void init() throws NoSuchFieldException, IllegalAccessException, IOException {
78         when(env.getProperty(eq("pnf.kafka.kafkaBootstrapServers"))).thenReturn(KAFKA_BOOTSTRAP_SERVERS);
79         when(env.getProperty(eq("pnf.kafka.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
80         when(env.getProperty(eq("pnf.kafka.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
81         when(env.getProperty(eq("pnf.kafka.consumerId"))).thenReturn(CONSUMER_ID);
82         when(env.getProperty(eq("pnf.kafka.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
83         when(env.getProperty(eq("pnf.kafka.consumerGroup"))).thenReturn(CONSUMER_GROUP);
84         when(env.getProperty(eq("pnf.kafka.topicListenerDelayInSeconds"), eq(Integer.class)))
85                 .thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
86         testedObject = new PnfEventReadyKafkaClient(env);
87         testedObjectInnerClassThread = testedObject.new KafkaTopicListenerThread();
88         kafkaConsumerMock = mock(KafkaConsumerImpl.class);
89         threadMockToNotifyCamundaFlow = mock(Runnable.class);
90         executorMock = mock(ScheduledThreadPoolExecutor.class);
91         setPrivateField();
92     }
93
94     /**
95      * Test run method, where the are following conditions:
96      * <p>
97      * - KafkaThreadListener is running, flag is set to true
98      * <p>
99      * - map is filled with one entry with the key that we get from response
100      * <p>
101      * run method should invoke thread from map to notify camunda process, remove element from the map (map is empty)
102      * and shutdown the executor because of empty map
103      */
104     @Test
105     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfUpdate() throws IOException {
106         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
107                 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
108                         JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
109         testedObjectInnerClassThread.run();
110         verify(kafkaConsumerMock).get(TOPIC_NAME_UPDATE, CONSUMER_GROUP, CONSUMER_ID_UPDATE);
111         verify(threadMockToNotifyCamundaFlow).run();
112         verify(executorMock).shutdown();
113     }
114
115
116     @Test
117     public void pnfCorrelationIdIsFoundInHttpResponse_notifyAboutPnfReady() throws IOException {
118         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
119                 .thenReturn(Collections.emptyList())
120                 .thenReturn(Arrays.asList(String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID),
121                         JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
122         testedObjectInnerClassThread.run();
123         verify(kafkaConsumerMock).get(TOPIC_NAME, CONSUMER_GROUP, CONSUMER_ID);
124
125         verify(threadMockToNotifyCamundaFlow).run();
126         verify(executorMock).shutdown();
127     }
128
129
130     /**
131      * Test run method, where the are following conditions:
132      * <p>
133      * - KafkaThreadListener is running, flag is set to true
134      * <p>
135      * - map is filled with one entry with the pnfCorrelationId that does not match to pnfCorrelationId taken from http
136      * response. run method should not do anything with the map not run any thread to notify camunda process
137      */
138     @Test
139     public void pnfCorrelationIdIsFoundInHttpResponse_NotFoundInMap() throws IOException {
140         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class))).thenReturn(Arrays.asList(
141                 String.format(JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[0], PNF_CORRELATION_ID_NOT_FOUND_IN_MAP),
142                 JSON_EXAMPLE_WITH_PNF_CORRELATION_ID[1]));
143         testedObjectInnerClassThread.run();
144         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
145     }
146
147     /**
148      * Test run method, where the are following conditions:
149      * <p>
150      * - KafkaThreadListener is running, flag is set to true
151      * <p>
152      * - map is filled with one entry with the pnfCorrelationId but no correlation id is taken from HttpResponse run
153      * method should not do anything with the map and not run any thread to notify camunda process
154      */
155     @Test
156     public void pnfCorrelationIdIsNotFoundInHttpResponse() throws IOException {
157         when(kafkaConsumerMock.get(any(String.class), any(String.class), any(String.class)))
158                 .thenReturn(Arrays.asList(JSON_EXAMPLE_WITH_NO_PNF_CORRELATION_ID));
159         testedObjectInnerClassThread.run();
160         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
161     }
162
163     private void setPrivateField() throws NoSuchFieldException, IllegalAccessException {
164         Field consumerForPnfReadyField = testedObject.getClass().getDeclaredField("consumerForPnfReady");
165         consumerForPnfReadyField.setAccessible(true);
166         consumerForPnfReadyField.set(testedObject, kafkaConsumerMock);
167         consumerForPnfReadyField.setAccessible(false);
168
169         Field consumerForPnfUpdateField = testedObject.getClass().getDeclaredField("consumerForPnfUpdate");
170         consumerForPnfUpdateField.setAccessible(true);
171         consumerForPnfUpdateField.set(testedObject, kafkaConsumerMock);
172         consumerForPnfUpdateField.setAccessible(false);
173
174         Field executorField = testedObject.getClass().getDeclaredField("executor");
175         executorField.setAccessible(true);
176         executorField.set(testedObject, executorMock);
177         executorField.setAccessible(false);
178
179         Field pnfCorrelationToThreadMapField = testedObject.getClass().getDeclaredField("pnfCorrelationIdToThreadMap");
180         pnfCorrelationToThreadMapField.setAccessible(true);
181         Map<String, Runnable> pnfCorrelationToThreadMap = new ConcurrentHashMap<>();
182         pnfCorrelationToThreadMap.put(PNF_CORRELATION_ID, threadMockToNotifyCamundaFlow);
183         pnfCorrelationToThreadMapField.set(testedObject, pnfCorrelationToThreadMap);
184
185         Field threadRunFlag = testedObject.getClass().getDeclaredField("kafkaThreadListenerIsRunning");
186         threadRunFlag.setAccessible(true);
187         threadRunFlag.set(testedObject, true);
188         threadRunFlag.setAccessible(false);
189     }
190
191 }