2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import static org.junit.Assert.*;
47 import static org.junit.Assert.fail;
48 import static org.mockito.Mockito.when;
50 public class DistributionEngineInitTaskTest {
53 private ComponentsUtils componentsUtils;
56 private CambriaHandler cambriaHandler;
60 ExternalConfiguration.setAppName("catalog-be");
61 ExternalConfiguration.setConfigDir("src/test/resources/config");
62 ExternalConfiguration.listenForChanges();
64 ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
65 new ConfigurationManager(configurationSource);
67 componentsUtils = Mockito.mock(ComponentsUtils.class);
68 cambriaHandler = Mockito.mock(CambriaHandler.class);
72 public void checkIncrement() {
74 String envName = "PrOD";
76 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
79 deConfiguration.setInitRetryIntervalSec(retry);
80 deConfiguration.setInitMaxIntervalSec(maxRetry);
81 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
83 for (int i = 1; i < 5; i++) {
84 initTask.incrementRetryInterval();
85 assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
88 initTask.incrementRetryInterval();
89 assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
94 public void checkStartTask() {
96 String envName = "PrOD";
98 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
101 deConfiguration.setInitRetryIntervalSec(retry);
102 deConfiguration.setInitMaxIntervalSec(maxRetry);
103 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
105 initTask.startTask();
109 public void checkRestartTask() {
111 String envName = "PrOD";
113 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
116 deConfiguration.setInitRetryIntervalSec(retry);
117 deConfiguration.setInitMaxIntervalSec(maxRetry);
118 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
120 initTask.restartTask();
124 public void checkStopTask() {
126 String envName = "PrOD";
128 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
131 deConfiguration.setInitRetryIntervalSec(retry);
132 deConfiguration.setInitMaxIntervalSec(maxRetry);
133 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
136 initTask.startTask();
141 public void checkDestroy() {
143 String envName = "PrOD";
145 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
148 deConfiguration.setInitRetryIntervalSec(retry);
149 deConfiguration.setInitMaxIntervalSec(maxRetry);
150 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
156 public void checkRun() {
158 String notifTopic = "notif";
159 String statusTopic = "status";
161 List<String> uebServers = new ArrayList<>();
162 uebServers.add("server1");
163 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
164 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
165 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
167 String envName = "PrOD";
169 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
170 deConfiguration.setUebServers(uebServers);
173 deConfiguration.setInitRetryIntervalSec(retry);
174 deConfiguration.setInitMaxIntervalSec(maxRetry);
175 deConfiguration.setDistributionNotifTopicName(notifTopic);
176 deConfiguration.setDistributionStatusTopicName(statusTopic);
177 CreateTopicConfig createTopic = new CreateTopicConfig();
178 createTopic.setPartitionCount(1);
179 createTopic.setReplicationCount(1);
180 deConfiguration.setCreateTopic(createTopic);
182 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
184 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
185 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
186 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
187 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
189 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
190 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
191 .thenReturn(cambriaErrorResponse);
193 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
194 initTask.setCambriaHandler(cambriaHandler);
196 boolean initFlow = initTask.initFlow();
201 @SuppressWarnings("unchecked")
203 public void testInitFlowScenarioSuccess() {
205 String notifTopic = "notif";
206 String statusTopic = "status";
208 List<String> uebServers = new ArrayList<>();
209 uebServers.add("server1");
210 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
211 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
212 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
214 String envName = "PrOD";
216 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
217 deConfiguration.setUebServers(uebServers);
220 deConfiguration.setInitRetryIntervalSec(retry);
221 deConfiguration.setInitMaxIntervalSec(maxRetry);
222 deConfiguration.setDistributionNotifTopicName(notifTopic);
223 deConfiguration.setDistributionStatusTopicName(statusTopic);
224 CreateTopicConfig createTopic = new CreateTopicConfig();
225 createTopic.setPartitionCount(1);
226 createTopic.setReplicationCount(1);
227 deConfiguration.setCreateTopic(createTopic);
229 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
231 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
232 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
233 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
234 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
236 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
237 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
238 .thenReturn(cambriaErrorResponse);
240 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
241 initTask.setCambriaHandler(cambriaHandler);
243 boolean initFlow = initTask.initFlow();
244 assertTrue("check init flow succeed", initFlow);
248 @SuppressWarnings("unchecked")
250 public void testInitFlowScenarioSuccessTopicsAlreadyExists() {
252 String envName = "PrOD";
253 String notifTopic = "notif";
254 String statusTopic = "status";
256 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
257 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
259 Set<String> topics = new HashSet<>();
260 topics.add(realNotifTopic);
261 topics.add(realStatusTopic);
263 List<String> uebServers = new ArrayList<>();
264 uebServers.add("server1");
265 Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
267 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
269 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
270 deConfiguration.setUebServers(uebServers);
273 deConfiguration.setInitRetryIntervalSec(retry);
274 deConfiguration.setInitMaxIntervalSec(maxRetry);
275 deConfiguration.setDistributionNotifTopicName(notifTopic);
276 deConfiguration.setDistributionStatusTopicName(statusTopic);
277 CreateTopicConfig createTopic = new CreateTopicConfig();
278 createTopic.setPartitionCount(1);
279 createTopic.setReplicationCount(1);
280 deConfiguration.setCreateTopic(createTopic);
282 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
283 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
284 .thenReturn(cambriaErrorResponse);
286 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
287 initTask.setCambriaHandler(cambriaHandler);
290 boolean initFlow = initTask.initFlow();
291 assertTrue("check init flow succeed", initFlow);
292 } catch (Exception e) {
293 fail("Should not throw exception");
298 @SuppressWarnings("unchecked")
300 public void testInitFlowScenarioFailToRegister() {
302 String notifTopic = "notif";
303 String statusTopic = "status";
305 List<String> uebServers = new ArrayList<>();
306 uebServers.add("server1");
307 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
308 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
309 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
311 String envName = "PrOD";
313 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
314 deConfiguration.setUebServers(uebServers);
317 deConfiguration.setInitRetryIntervalSec(retry);
318 deConfiguration.setInitMaxIntervalSec(maxRetry);
319 deConfiguration.setDistributionNotifTopicName(notifTopic);
320 deConfiguration.setDistributionStatusTopicName(statusTopic);
321 CreateTopicConfig createTopic = new CreateTopicConfig();
322 createTopic.setPartitionCount(1);
323 createTopic.setReplicationCount(1);
324 deConfiguration.setCreateTopic(createTopic);
326 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
328 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
329 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
330 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
331 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
333 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
334 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
336 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
337 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
340 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
341 initTask.setCambriaHandler(cambriaHandler);
343 boolean initFlow = initTask.initFlow();
344 assertFalse("check init flow failed", initFlow);
348 @SuppressWarnings("unchecked")
350 public void testInitFlowScenario1GetTopicsFailed() {
352 List<String> uebServers = new ArrayList<>();
353 uebServers.add("server1");
354 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
355 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
356 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
358 String envName = "PrOD";
360 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
361 deConfiguration.setUebServers(uebServers);
364 deConfiguration.setInitRetryIntervalSec(retry);
365 deConfiguration.setInitMaxIntervalSec(maxRetry);
366 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
367 initTask.setCambriaHandler(cambriaHandler);
369 boolean initFlow = initTask.initFlow();
370 assertFalse("check init flow failed", initFlow);
374 private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
375 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
376 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
377 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
378 Set<String> puebEndpoints = new HashSet<>();
379 if(distributionEngineConfiguration.getUebServers() != null)
380 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
381 entry.setDmaapUebAddress(puebEndpoints);
382 String envName = "UNKNOWN";
383 if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
384 envName = distributionEngineConfiguration.getEnvironments().get(0);
385 entry.setEnvironmentId(envName);