2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.jupiter.api.BeforeEach;
26 import org.junit.jupiter.api.Test;
27 import org.mockito.Mockito;
28 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.junit.Assert.fail;
50 import static org.mockito.Mockito.when;
52 class DistributionEngineInitTaskTest {
54 private ComponentsUtils componentsUtils;
56 private CambriaHandler cambriaHandler;
58 private KafkaHandler kafkaHandler;
62 ExternalConfiguration.setAppName("catalog-be");
63 ExternalConfiguration.setConfigDir("src/test/resources/config");
64 ExternalConfiguration.listenForChanges();
66 ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
67 new ConfigurationManager(configurationSource);
69 componentsUtils = Mockito.mock(ComponentsUtils.class);
70 cambriaHandler = Mockito.mock(CambriaHandler.class);
71 kafkaHandler = Mockito.mock(KafkaHandler.class);
75 void checkIncrement() {
77 String envName = "PrOD";
79 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
82 deConfiguration.setInitRetryIntervalSec(retry);
83 deConfiguration.setInitMaxIntervalSec(maxRetry);
84 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
86 for (int i = 1; i < 5; i++) {
87 initTask.incrementRetryInterval();
88 assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
91 initTask.incrementRetryInterval();
92 assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
97 void checkStartTask() {
99 String envName = "PrOD";
101 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
104 deConfiguration.setInitRetryIntervalSec(retry);
105 deConfiguration.setInitMaxIntervalSec(maxRetry);
106 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
108 initTask.startTask();
112 void checkRestartTask() {
114 String envName = "PrOD";
116 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
119 deConfiguration.setInitRetryIntervalSec(retry);
120 deConfiguration.setInitMaxIntervalSec(maxRetry);
121 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
123 initTask.restartTask();
127 void checkStopTask() {
129 String envName = "PrOD";
131 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
134 deConfiguration.setInitRetryIntervalSec(retry);
135 deConfiguration.setInitMaxIntervalSec(maxRetry);
136 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
139 initTask.startTask();
144 void checkDestroy() {
146 String envName = "PrOD";
148 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
151 deConfiguration.setInitRetryIntervalSec(retry);
152 deConfiguration.setInitMaxIntervalSec(maxRetry);
153 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
161 String notifTopic = "notif";
162 String statusTopic = "status";
164 List<String> uebServers = new ArrayList<>();
165 uebServers.add("server1");
166 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
167 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
168 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
170 String envName = "PrOD";
172 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
173 deConfiguration.setUebServers(uebServers);
176 deConfiguration.setInitRetryIntervalSec(retry);
177 deConfiguration.setInitMaxIntervalSec(maxRetry);
178 deConfiguration.setDistributionNotifTopicName(notifTopic);
179 deConfiguration.setDistributionStatusTopicName(statusTopic);
180 CreateTopicConfig createTopic = new CreateTopicConfig();
181 createTopic.setPartitionCount(1);
182 createTopic.setReplicationCount(1);
183 deConfiguration.setCreateTopic(createTopic);
185 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
187 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
188 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
189 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
190 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
192 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
193 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
194 .thenReturn(cambriaErrorResponse);
196 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
197 initTask.setCambriaHandler(cambriaHandler);
199 boolean initFlow = initTask.initFlow();
205 void testInitFlowScenarioSuccess() {
207 String notifTopic = "notif";
208 String statusTopic = "status";
210 List<String> uebServers = new ArrayList<>();
211 uebServers.add("server1");
212 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
213 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
214 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
216 String envName = "PrOD";
218 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
219 deConfiguration.setUebServers(uebServers);
222 deConfiguration.setInitRetryIntervalSec(retry);
223 deConfiguration.setInitMaxIntervalSec(maxRetry);
224 deConfiguration.setDistributionNotifTopicName(notifTopic);
225 deConfiguration.setDistributionStatusTopicName(statusTopic);
226 CreateTopicConfig createTopic = new CreateTopicConfig();
227 createTopic.setPartitionCount(1);
228 createTopic.setReplicationCount(1);
229 deConfiguration.setCreateTopic(createTopic);
231 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
233 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
234 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
235 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
236 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
238 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
239 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
240 .thenReturn(cambriaErrorResponse);
242 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
243 initTask.setCambriaHandler(cambriaHandler);
245 boolean initFlow = initTask.initFlow();
246 assertTrue("check init flow succeed", initFlow);
251 void testInitFlowSuccessKafkaEnabled(){
252 DistributionEngineConfiguration config = new DistributionEngineConfiguration();
253 config.setInitRetryIntervalSec(1);
254 config.setInitMaxIntervalSec(1);
256 when(kafkaHandler.isKafkaActive()).thenReturn(true);
257 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null);
258 initTask.setKafkaHandler(kafkaHandler);
260 boolean initFlow = initTask.initFlow();
261 assertTrue("check init flow succeed", initFlow);
265 void testInitFlowScenarioSuccessTopicsAlreadyExists() {
267 String envName = "PrOD";
268 String notifTopic = "notif";
269 String statusTopic = "status";
271 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
272 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
274 Set<String> topics = new HashSet<>();
275 topics.add(realNotifTopic);
276 topics.add(realStatusTopic);
278 List<String> uebServers = new ArrayList<>();
279 uebServers.add("server1");
280 Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
282 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
284 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
285 deConfiguration.setUebServers(uebServers);
288 deConfiguration.setInitRetryIntervalSec(retry);
289 deConfiguration.setInitMaxIntervalSec(maxRetry);
290 deConfiguration.setDistributionNotifTopicName(notifTopic);
291 deConfiguration.setDistributionStatusTopicName(statusTopic);
292 CreateTopicConfig createTopic = new CreateTopicConfig();
293 createTopic.setPartitionCount(1);
294 createTopic.setReplicationCount(1);
295 deConfiguration.setCreateTopic(createTopic);
297 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
298 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
299 .thenReturn(cambriaErrorResponse);
301 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
302 initTask.setCambriaHandler(cambriaHandler);
305 boolean initFlow = initTask.initFlow();
306 assertTrue("check init flow succeed", initFlow);
307 } catch (Exception e) {
308 fail("Should not throw exception");
314 void testInitFlowScenarioFailToRegister() {
316 String notifTopic = "notif";
317 String statusTopic = "status";
319 List<String> uebServers = new ArrayList<>();
320 uebServers.add("server1");
321 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
322 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
323 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
325 String envName = "PrOD";
327 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
328 deConfiguration.setUebServers(uebServers);
331 deConfiguration.setInitRetryIntervalSec(retry);
332 deConfiguration.setInitMaxIntervalSec(maxRetry);
333 deConfiguration.setDistributionNotifTopicName(notifTopic);
334 deConfiguration.setDistributionStatusTopicName(statusTopic);
335 CreateTopicConfig createTopic = new CreateTopicConfig();
336 createTopic.setPartitionCount(1);
337 createTopic.setReplicationCount(1);
338 deConfiguration.setCreateTopic(createTopic);
340 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
342 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
343 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
344 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
345 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
347 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
348 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
350 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
351 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
354 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
355 initTask.setCambriaHandler(cambriaHandler);
357 boolean initFlow = initTask.initFlow();
358 assertFalse("check init flow failed", initFlow);
363 void testInitFlowScenario1GetTopicsFailed() {
365 List<String> uebServers = new ArrayList<>();
366 uebServers.add("server1");
367 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
368 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
369 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
371 String envName = "PrOD";
373 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
374 deConfiguration.setUebServers(uebServers);
377 deConfiguration.setInitRetryIntervalSec(retry);
378 deConfiguration.setInitMaxIntervalSec(maxRetry);
379 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
380 initTask.setCambriaHandler(cambriaHandler);
382 boolean initFlow = initTask.initFlow();
383 assertFalse("check init flow failed", initFlow);
387 private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
388 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
389 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
390 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
391 Set<String> puebEndpoints = new HashSet<>();
392 if(distributionEngineConfiguration.getUebServers() != null)
393 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
394 entry.setDmaapUebAddress(puebEndpoints);
395 String envName = "UNKNOWN";
396 if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
397 envName = distributionEngineConfiguration.getEnvironments().get(0);
398 entry.setEnvironmentId(envName);