[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEngineInitTaskTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.jupiter.api.BeforeEach;
26 import org.junit.jupiter.api.Test;
27 import org.mockito.Mockito;
28 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
38
39 import java.io.File;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.junit.Assert.fail;
50 import static org.mockito.Mockito.when;
51
52 class DistributionEngineInitTaskTest {
53
54     private ComponentsUtils componentsUtils;
55
56     private CambriaHandler cambriaHandler;
57
58     private KafkaHandler kafkaHandler;
59
60     @BeforeEach
61     public void setup() {
62         ExternalConfiguration.setAppName("catalog-be");
63         ExternalConfiguration.setConfigDir("src/test/resources/config");
64         ExternalConfiguration.listenForChanges();
65
66         ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
67         new ConfigurationManager(configurationSource);
68
69         componentsUtils = Mockito.mock(ComponentsUtils.class);
70         cambriaHandler = Mockito.mock(CambriaHandler.class);
71         kafkaHandler = Mockito.mock(KafkaHandler.class);
72     }
73
74     @Test
75     void checkIncrement() {
76
77         String envName = "PrOD";
78
79         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
80         int retry = 2;
81         int maxRetry = 40;
82         deConfiguration.setInitRetryIntervalSec(retry);
83         deConfiguration.setInitMaxIntervalSec(maxRetry);
84         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
85
86         for (int i = 1; i < 5; i++) {
87             initTask.incrementRetryInterval();
88             assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
89         }
90
91         initTask.incrementRetryInterval();
92         assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
93
94     }
95
96     @Test
97     void checkStartTask() {
98
99         String envName = "PrOD";
100
101         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
102         int retry = 2;
103         int maxRetry = 40;
104         deConfiguration.setInitRetryIntervalSec(retry);
105         deConfiguration.setInitMaxIntervalSec(maxRetry);
106         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
107
108         initTask.startTask();
109     }
110
111     @Test
112     void checkRestartTask() {
113
114         String envName = "PrOD";
115
116         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
117         int retry = 2;
118         int maxRetry = 40;
119         deConfiguration.setInitRetryIntervalSec(retry);
120         deConfiguration.setInitMaxIntervalSec(maxRetry);
121         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
122
123         initTask.restartTask();
124     }
125
126     @Test
127     void checkStopTask() {
128
129         String envName = "PrOD";
130
131         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
132         int retry = 2;
133         int maxRetry = 40;
134         deConfiguration.setInitRetryIntervalSec(retry);
135         deConfiguration.setInitMaxIntervalSec(maxRetry);
136         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
137
138         initTask.stopTask();
139         initTask.startTask();
140         initTask.stopTask();
141     }
142
143     @Test
144     void checkDestroy() {
145
146         String envName = "PrOD";
147
148         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
149         int retry = 2;
150         int maxRetry = 40;
151         deConfiguration.setInitRetryIntervalSec(retry);
152         deConfiguration.setInitMaxIntervalSec(maxRetry);
153         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
154
155         initTask.destroy();
156     }
157
158     @Test
159     void checkRun() {
160
161         String notifTopic = "notif";
162         String statusTopic = "status";
163
164         List<String> uebServers = new ArrayList<>();
165         uebServers.add("server1");
166         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
167         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
168         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
169
170         String envName = "PrOD";
171
172         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
173         deConfiguration.setUebServers(uebServers);
174         int retry = 2;
175         int maxRetry = 40;
176         deConfiguration.setInitRetryIntervalSec(retry);
177         deConfiguration.setInitMaxIntervalSec(maxRetry);
178         deConfiguration.setDistributionNotifTopicName(notifTopic);
179         deConfiguration.setDistributionStatusTopicName(statusTopic);
180         CreateTopicConfig createTopic = new CreateTopicConfig();
181         createTopic.setPartitionCount(1);
182         createTopic.setReplicationCount(1);
183         deConfiguration.setCreateTopic(createTopic);
184
185         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
186
187         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
188         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
189         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
190         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
191
192         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
193         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
194                 .thenReturn(cambriaErrorResponse);
195
196         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
197         initTask.setCambriaHandler(cambriaHandler);
198
199         boolean initFlow = initTask.initFlow();
200
201         initTask.run();
202     }
203
204     @Test
205     void testInitFlowScenarioSuccess() {
206
207         String notifTopic = "notif";
208         String statusTopic = "status";
209
210         List<String> uebServers = new ArrayList<>();
211         uebServers.add("server1");
212         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
213         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
214         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
215
216         String envName = "PrOD";
217
218         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
219         deConfiguration.setUebServers(uebServers);
220         int retry = 2;
221         int maxRetry = 40;
222         deConfiguration.setInitRetryIntervalSec(retry);
223         deConfiguration.setInitMaxIntervalSec(maxRetry);
224         deConfiguration.setDistributionNotifTopicName(notifTopic);
225         deConfiguration.setDistributionStatusTopicName(statusTopic);
226         CreateTopicConfig createTopic = new CreateTopicConfig();
227         createTopic.setPartitionCount(1);
228         createTopic.setReplicationCount(1);
229         deConfiguration.setCreateTopic(createTopic);
230
231         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
232
233         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
234         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
235         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
236         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
237
238         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
239         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
240                 .thenReturn(cambriaErrorResponse);
241
242         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
243         initTask.setCambriaHandler(cambriaHandler);
244
245         boolean initFlow = initTask.initFlow();
246         assertTrue("check init flow succeed", initFlow);
247
248     }
249
250     @Test
251     void testInitFlowSuccessKafkaEnabled(){
252         DistributionEngineConfiguration config = new DistributionEngineConfiguration();
253         config.setInitRetryIntervalSec(1);
254         config.setInitMaxIntervalSec(1);
255
256         when(kafkaHandler.isKafkaActive()).thenReturn(true);
257         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null);
258         initTask.setKafkaHandler(kafkaHandler);
259
260         boolean initFlow = initTask.initFlow();
261         assertTrue("check init flow succeed", initFlow);
262     }
263
264     @Test
265     void testInitFlowScenarioSuccessTopicsAlreadyExists() {
266
267         String envName = "PrOD";
268         String notifTopic = "notif";
269         String statusTopic = "status";
270
271         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
272         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
273
274         Set<String> topics = new HashSet<>();
275         topics.add(realNotifTopic);
276         topics.add(realStatusTopic);
277
278         List<String> uebServers = new ArrayList<>();
279         uebServers.add("server1");
280         Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
281
282         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
283
284         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
285         deConfiguration.setUebServers(uebServers);
286         int retry = 2;
287         int maxRetry = 40;
288         deConfiguration.setInitRetryIntervalSec(retry);
289         deConfiguration.setInitMaxIntervalSec(maxRetry);
290         deConfiguration.setDistributionNotifTopicName(notifTopic);
291         deConfiguration.setDistributionStatusTopicName(statusTopic);
292         CreateTopicConfig createTopic = new CreateTopicConfig();
293         createTopic.setPartitionCount(1);
294         createTopic.setReplicationCount(1);
295         deConfiguration.setCreateTopic(createTopic);
296
297         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
298         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
299                 .thenReturn(cambriaErrorResponse);
300
301         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
302         initTask.setCambriaHandler(cambriaHandler);
303
304         try {
305             boolean initFlow = initTask.initFlow();
306             assertTrue("check init flow succeed", initFlow);
307         } catch (Exception e) {
308             fail("Should not throw exception");
309         }
310
311     }
312
313     @Test
314     void testInitFlowScenarioFailToRegister() {
315
316         String notifTopic = "notif";
317         String statusTopic = "status";
318
319         List<String> uebServers = new ArrayList<>();
320         uebServers.add("server1");
321         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
322         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
323         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
324
325         String envName = "PrOD";
326
327         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
328         deConfiguration.setUebServers(uebServers);
329         int retry = 2;
330         int maxRetry = 40;
331         deConfiguration.setInitRetryIntervalSec(retry);
332         deConfiguration.setInitMaxIntervalSec(maxRetry);
333         deConfiguration.setDistributionNotifTopicName(notifTopic);
334         deConfiguration.setDistributionStatusTopicName(statusTopic);
335         CreateTopicConfig createTopic = new CreateTopicConfig();
336         createTopic.setPartitionCount(1);
337         createTopic.setReplicationCount(1);
338         deConfiguration.setCreateTopic(createTopic);
339
340         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
341
342         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
343         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
344         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
345         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
346
347         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
348                 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
349
350         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
351                 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
352
353
354         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
355         initTask.setCambriaHandler(cambriaHandler);
356
357         boolean initFlow = initTask.initFlow();
358         assertFalse("check init flow failed", initFlow);
359
360     }
361
362     @Test
363     void testInitFlowScenario1GetTopicsFailed() {
364
365         List<String> uebServers = new ArrayList<>();
366         uebServers.add("server1");
367         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
368         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
369         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
370
371         String envName = "PrOD";
372
373         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
374         deConfiguration.setUebServers(uebServers);
375         int retry = 2;
376         int maxRetry = 40;
377         deConfiguration.setInitRetryIntervalSec(retry);
378         deConfiguration.setInitMaxIntervalSec(maxRetry);
379         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
380         initTask.setCambriaHandler(cambriaHandler);
381
382         boolean initFlow = initTask.initFlow();
383         assertFalse("check init flow failed", initFlow);
384
385     }
386
387     private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
388         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
389         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
390         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
391         Set<String> puebEndpoints = new HashSet<>();
392         if(distributionEngineConfiguration.getUebServers() != null)
393             puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
394         entry.setDmaapUebAddress(puebEndpoints);
395         String envName = "UNKNOWN";
396         if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
397             envName = distributionEngineConfiguration.getEnvironments().get(0);
398         entry.setEnvironmentId(envName);
399         return entry;
400     }
401
402 }