2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.jupiter.api.BeforeEach;
26 import org.junit.jupiter.api.Test;
27 import org.mockito.Mockito;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
31 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
34 import org.openecomp.sdc.common.api.ConfigurationSource;
35 import org.openecomp.sdc.common.impl.ExternalConfiguration;
36 import org.openecomp.sdc.common.impl.FSConfigurationSource;
39 import java.util.ArrayList;
40 import java.util.HashSet;
41 import java.util.List;
43 import java.util.concurrent.atomic.AtomicBoolean;
45 import static org.junit.Assert.assertEquals;
46 import static org.junit.Assert.assertFalse;
47 import static org.junit.Assert.assertTrue;
48 import static org.junit.Assert.fail;
49 import static org.mockito.Mockito.when;
51 class DistributionEngineInitTaskTest {
53 private ComponentsUtils componentsUtils;
55 private CambriaHandler cambriaHandler;
59 ExternalConfiguration.setAppName("catalog-be");
60 ExternalConfiguration.setConfigDir("src/test/resources/config");
61 ExternalConfiguration.listenForChanges();
63 ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
64 new ConfigurationManager(configurationSource);
66 componentsUtils = Mockito.mock(ComponentsUtils.class);
67 cambriaHandler = Mockito.mock(CambriaHandler.class);
71 void checkIncrement() {
73 String envName = "PrOD";
75 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
78 deConfiguration.setInitRetryIntervalSec(retry);
79 deConfiguration.setInitMaxIntervalSec(maxRetry);
80 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
82 for (int i = 1; i < 5; i++) {
83 initTask.incrementRetryInterval();
84 assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
87 initTask.incrementRetryInterval();
88 assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
93 void checkStartTask() {
95 String envName = "PrOD";
97 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
100 deConfiguration.setInitRetryIntervalSec(retry);
101 deConfiguration.setInitMaxIntervalSec(maxRetry);
102 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
104 initTask.startTask();
108 void checkRestartTask() {
110 String envName = "PrOD";
112 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
115 deConfiguration.setInitRetryIntervalSec(retry);
116 deConfiguration.setInitMaxIntervalSec(maxRetry);
117 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
119 initTask.restartTask();
123 void checkStopTask() {
125 String envName = "PrOD";
127 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
130 deConfiguration.setInitRetryIntervalSec(retry);
131 deConfiguration.setInitMaxIntervalSec(maxRetry);
132 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
135 initTask.startTask();
140 void checkDestroy() {
142 String envName = "PrOD";
144 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
147 deConfiguration.setInitRetryIntervalSec(retry);
148 deConfiguration.setInitMaxIntervalSec(maxRetry);
149 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
157 String notifTopic = "notif";
158 String statusTopic = "status";
160 List<String> uebServers = new ArrayList<>();
161 uebServers.add("server1");
162 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
163 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
164 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
166 String envName = "PrOD";
168 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
169 deConfiguration.setUebServers(uebServers);
172 deConfiguration.setInitRetryIntervalSec(retry);
173 deConfiguration.setInitMaxIntervalSec(maxRetry);
174 deConfiguration.setDistributionNotifTopicName(notifTopic);
175 deConfiguration.setDistributionStatusTopicName(statusTopic);
176 CreateTopicConfig createTopic = new CreateTopicConfig();
177 createTopic.setPartitionCount(1);
178 createTopic.setReplicationCount(1);
179 deConfiguration.setCreateTopic(createTopic);
181 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
183 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
184 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
185 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
186 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
188 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
189 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
190 .thenReturn(cambriaErrorResponse);
192 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
193 initTask.setCambriaHandler(cambriaHandler);
195 boolean initFlow = initTask.initFlow();
201 void testInitFlowScenarioSuccess() {
203 String notifTopic = "notif";
204 String statusTopic = "status";
206 List<String> uebServers = new ArrayList<>();
207 uebServers.add("server1");
208 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
209 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
210 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
212 String envName = "PrOD";
214 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
215 deConfiguration.setUebServers(uebServers);
218 deConfiguration.setInitRetryIntervalSec(retry);
219 deConfiguration.setInitMaxIntervalSec(maxRetry);
220 deConfiguration.setDistributionNotifTopicName(notifTopic);
221 deConfiguration.setDistributionStatusTopicName(statusTopic);
222 CreateTopicConfig createTopic = new CreateTopicConfig();
223 createTopic.setPartitionCount(1);
224 createTopic.setReplicationCount(1);
225 deConfiguration.setCreateTopic(createTopic);
227 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
229 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
230 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
231 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
232 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
234 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
235 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
236 .thenReturn(cambriaErrorResponse);
238 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
239 initTask.setCambriaHandler(cambriaHandler);
241 boolean initFlow = initTask.initFlow();
242 assertTrue("check init flow succeed", initFlow);
247 void testInitFlowScenarioSuccessTopicsAlreadyExists() {
249 String envName = "PrOD";
250 String notifTopic = "notif";
251 String statusTopic = "status";
253 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
254 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
256 Set<String> topics = new HashSet<>();
257 topics.add(realNotifTopic);
258 topics.add(realStatusTopic);
260 List<String> uebServers = new ArrayList<>();
261 uebServers.add("server1");
262 Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
264 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
266 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
267 deConfiguration.setUebServers(uebServers);
270 deConfiguration.setInitRetryIntervalSec(retry);
271 deConfiguration.setInitMaxIntervalSec(maxRetry);
272 deConfiguration.setDistributionNotifTopicName(notifTopic);
273 deConfiguration.setDistributionStatusTopicName(statusTopic);
274 CreateTopicConfig createTopic = new CreateTopicConfig();
275 createTopic.setPartitionCount(1);
276 createTopic.setReplicationCount(1);
277 deConfiguration.setCreateTopic(createTopic);
279 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
280 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
281 .thenReturn(cambriaErrorResponse);
283 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
284 initTask.setCambriaHandler(cambriaHandler);
287 boolean initFlow = initTask.initFlow();
288 assertTrue("check init flow succeed", initFlow);
289 } catch (Exception e) {
290 fail("Should not throw exception");
296 void testInitFlowScenarioFailToRegister() {
298 String notifTopic = "notif";
299 String statusTopic = "status";
301 List<String> uebServers = new ArrayList<>();
302 uebServers.add("server1");
303 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
304 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
305 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
307 String envName = "PrOD";
309 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
310 deConfiguration.setUebServers(uebServers);
313 deConfiguration.setInitRetryIntervalSec(retry);
314 deConfiguration.setInitMaxIntervalSec(maxRetry);
315 deConfiguration.setDistributionNotifTopicName(notifTopic);
316 deConfiguration.setDistributionStatusTopicName(statusTopic);
317 CreateTopicConfig createTopic = new CreateTopicConfig();
318 createTopic.setPartitionCount(1);
319 createTopic.setReplicationCount(1);
320 deConfiguration.setCreateTopic(createTopic);
322 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
324 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
325 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
326 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
327 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
329 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
330 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
332 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
333 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
336 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
337 initTask.setCambriaHandler(cambriaHandler);
339 boolean initFlow = initTask.initFlow();
340 assertFalse("check init flow failed", initFlow);
345 void testInitFlowScenario1GetTopicsFailed() {
347 List<String> uebServers = new ArrayList<>();
348 uebServers.add("server1");
349 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
350 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
351 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
353 String envName = "PrOD";
355 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
356 deConfiguration.setUebServers(uebServers);
359 deConfiguration.setInitRetryIntervalSec(retry);
360 deConfiguration.setInitMaxIntervalSec(maxRetry);
361 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
362 initTask.setCambriaHandler(cambriaHandler);
364 boolean initFlow = initTask.initFlow();
365 assertFalse("check init flow failed", initFlow);
369 private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
370 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
371 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
372 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
373 Set<String> puebEndpoints = new HashSet<>();
374 if(distributionEngineConfiguration.getUebServers() != null)
375 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
376 entry.setDmaapUebAddress(puebEndpoints);
377 String envName = "UNKNOWN";
378 if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
379 envName = distributionEngineConfiguration.getEnvironments().get(0);
380 entry.setEnvironmentId(envName);