2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.junit.Assert.fail;
50 import static org.mockito.Mockito.when;
52 public class DistributionEngineInitTaskTest {
55 private ComponentsUtils componentsUtils;
58 private CambriaHandler cambriaHandler;
62 ExternalConfiguration.setAppName("catalog-be");
63 ExternalConfiguration.setConfigDir("src/test/resources/config");
64 ExternalConfiguration.listenForChanges();
66 ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
67 new ConfigurationManager(configurationSource);
69 componentsUtils = Mockito.mock(ComponentsUtils.class);
70 cambriaHandler = Mockito.mock(CambriaHandler.class);
74 public void checkIncrement() {
76 String envName = "PrOD";
78 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
81 deConfiguration.setInitRetryIntervalSec(retry);
82 deConfiguration.setInitMaxIntervalSec(maxRetry);
83 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
85 for (int i = 1; i < 5; i++) {
86 initTask.incrementRetryInterval();
87 assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
90 initTask.incrementRetryInterval();
91 assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
96 public void checkStartTask() {
98 String envName = "PrOD";
100 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
103 deConfiguration.setInitRetryIntervalSec(retry);
104 deConfiguration.setInitMaxIntervalSec(maxRetry);
105 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
107 initTask.startTask();
111 public void checkRestartTask() {
113 String envName = "PrOD";
115 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
118 deConfiguration.setInitRetryIntervalSec(retry);
119 deConfiguration.setInitMaxIntervalSec(maxRetry);
120 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
122 initTask.restartTask();
126 public void checkStopTask() {
128 String envName = "PrOD";
130 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
133 deConfiguration.setInitRetryIntervalSec(retry);
134 deConfiguration.setInitMaxIntervalSec(maxRetry);
135 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
138 initTask.startTask();
143 public void checkDestroy() {
145 String envName = "PrOD";
147 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
150 deConfiguration.setInitRetryIntervalSec(retry);
151 deConfiguration.setInitMaxIntervalSec(maxRetry);
152 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
158 public void checkRun() {
160 String notifTopic = "notif";
161 String statusTopic = "status";
163 List<String> uebServers = new ArrayList<>();
164 uebServers.add("server1");
165 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
166 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
167 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
169 String envName = "PrOD";
171 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
172 deConfiguration.setUebServers(uebServers);
175 deConfiguration.setInitRetryIntervalSec(retry);
176 deConfiguration.setInitMaxIntervalSec(maxRetry);
177 deConfiguration.setDistributionNotifTopicName(notifTopic);
178 deConfiguration.setDistributionStatusTopicName(statusTopic);
179 CreateTopicConfig createTopic = new CreateTopicConfig();
180 createTopic.setPartitionCount(1);
181 createTopic.setReplicationCount(1);
182 deConfiguration.setCreateTopic(createTopic);
184 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
186 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
187 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
188 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
189 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
191 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
192 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
193 .thenReturn(cambriaErrorResponse);
195 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
196 initTask.setCambriaHandler(cambriaHandler);
198 boolean initFlow = initTask.initFlow();
203 @SuppressWarnings("unchecked")
205 public void testInitFlowScenarioSuccess() {
207 String notifTopic = "notif";
208 String statusTopic = "status";
210 List<String> uebServers = new ArrayList<>();
211 uebServers.add("server1");
212 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
213 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
214 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
216 String envName = "PrOD";
218 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
219 deConfiguration.setUebServers(uebServers);
222 deConfiguration.setInitRetryIntervalSec(retry);
223 deConfiguration.setInitMaxIntervalSec(maxRetry);
224 deConfiguration.setDistributionNotifTopicName(notifTopic);
225 deConfiguration.setDistributionStatusTopicName(statusTopic);
226 CreateTopicConfig createTopic = new CreateTopicConfig();
227 createTopic.setPartitionCount(1);
228 createTopic.setReplicationCount(1);
229 deConfiguration.setCreateTopic(createTopic);
231 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
233 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
234 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
235 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
236 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
238 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
239 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
240 .thenReturn(cambriaErrorResponse);
242 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
243 initTask.setCambriaHandler(cambriaHandler);
245 boolean initFlow = initTask.initFlow();
246 assertTrue("check init flow succeed", initFlow);
250 @SuppressWarnings("unchecked")
252 public void testInitFlowScenarioSuccessTopicsAlreadyExists() {
254 String envName = "PrOD";
255 String notifTopic = "notif";
256 String statusTopic = "status";
258 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
259 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
261 Set<String> topics = new HashSet<>();
262 topics.add(realNotifTopic);
263 topics.add(realStatusTopic);
265 List<String> uebServers = new ArrayList<>();
266 uebServers.add("server1");
267 Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
269 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
271 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
272 deConfiguration.setUebServers(uebServers);
275 deConfiguration.setInitRetryIntervalSec(retry);
276 deConfiguration.setInitMaxIntervalSec(maxRetry);
277 deConfiguration.setDistributionNotifTopicName(notifTopic);
278 deConfiguration.setDistributionStatusTopicName(statusTopic);
279 CreateTopicConfig createTopic = new CreateTopicConfig();
280 createTopic.setPartitionCount(1);
281 createTopic.setReplicationCount(1);
282 deConfiguration.setCreateTopic(createTopic);
284 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
285 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
286 .thenReturn(cambriaErrorResponse);
288 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
289 initTask.setCambriaHandler(cambriaHandler);
292 boolean initFlow = initTask.initFlow();
293 assertTrue("check init flow succeed", initFlow);
294 } catch (Exception e) {
295 fail("Should not throw exception");
300 @SuppressWarnings("unchecked")
302 public void testInitFlowScenarioFailToRegister() {
304 String notifTopic = "notif";
305 String statusTopic = "status";
307 List<String> uebServers = new ArrayList<>();
308 uebServers.add("server1");
309 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
310 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
311 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
313 String envName = "PrOD";
315 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
316 deConfiguration.setUebServers(uebServers);
319 deConfiguration.setInitRetryIntervalSec(retry);
320 deConfiguration.setInitMaxIntervalSec(maxRetry);
321 deConfiguration.setDistributionNotifTopicName(notifTopic);
322 deConfiguration.setDistributionStatusTopicName(statusTopic);
323 CreateTopicConfig createTopic = new CreateTopicConfig();
324 createTopic.setPartitionCount(1);
325 createTopic.setReplicationCount(1);
326 deConfiguration.setCreateTopic(createTopic);
328 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
330 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
331 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
332 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
333 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
335 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
336 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
338 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
339 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
342 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
343 initTask.setCambriaHandler(cambriaHandler);
345 boolean initFlow = initTask.initFlow();
346 assertFalse("check init flow failed", initFlow);
350 @SuppressWarnings("unchecked")
352 public void testInitFlowScenario1GetTopicsFailed() {
354 List<String> uebServers = new ArrayList<>();
355 uebServers.add("server1");
356 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
357 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
358 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
360 String envName = "PrOD";
362 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
363 deConfiguration.setUebServers(uebServers);
366 deConfiguration.setInitRetryIntervalSec(retry);
367 deConfiguration.setInitMaxIntervalSec(maxRetry);
368 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
369 initTask.setCambriaHandler(cambriaHandler);
371 boolean initFlow = initTask.initFlow();
372 assertFalse("check init flow failed", initFlow);
376 private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
377 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
378 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
379 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
380 Set<String> puebEndpoints = new HashSet<>();
381 if(distributionEngineConfiguration.getUebServers() != null)
382 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
383 entry.setDmaapUebAddress(puebEndpoints);
384 String envName = "UNKNOWN";
385 if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
386 envName = distributionEngineConfiguration.getEnvironments().get(0);
387 entry.setEnvironmentId(envName);