2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.when;
51 public class DistributionEngineInitTaskTest {
54 private ComponentsUtils componentsUtils;
57 private CambriaHandler cambriaHandler;
61 ExternalConfiguration.setAppName("catalog-be");
62 ExternalConfiguration.setConfigDir("src/test/resources/config");
63 ExternalConfiguration.listenForChanges();
65 ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
66 new ConfigurationManager(configurationSource);
68 componentsUtils = Mockito.mock(ComponentsUtils.class);
69 cambriaHandler = Mockito.mock(CambriaHandler.class);
73 public void checkIncrement() {
75 String envName = "PrOD";
77 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
80 deConfiguration.setInitRetryIntervalSec(retry);
81 deConfiguration.setInitMaxIntervalSec(maxRetry);
82 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
84 for (int i = 1; i < 5; i++) {
85 initTask.incrementRetryInterval();
86 assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
89 initTask.incrementRetryInterval();
90 assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
94 @SuppressWarnings("unchecked")
96 public void testInitFlowScenarioSuccess() {
98 String notifTopic = "notif";
99 String statusTopic = "status";
101 List<String> uebServers = new ArrayList<>();
102 uebServers.add("server1");
103 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
104 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
105 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
107 String envName = "PrOD";
109 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
110 deConfiguration.setUebServers(uebServers);
113 deConfiguration.setInitRetryIntervalSec(retry);
114 deConfiguration.setInitMaxIntervalSec(maxRetry);
115 deConfiguration.setDistributionNotifTopicName(notifTopic);
116 deConfiguration.setDistributionStatusTopicName(statusTopic);
117 CreateTopicConfig createTopic = new CreateTopicConfig();
118 createTopic.setPartitionCount(1);
119 createTopic.setReplicationCount(1);
120 deConfiguration.setCreateTopic(createTopic);
122 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
124 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
125 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
126 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
127 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
129 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
130 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
131 .thenReturn(cambriaErrorResponse);
133 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
134 initTask.setCambriaHandler(cambriaHandler);
136 boolean initFlow = initTask.initFlow();
137 assertTrue("check init flow succeed", initFlow);
141 @SuppressWarnings("unchecked")
143 public void testInitFlowScenarioSuccessTopicsAlreadyExists() {
145 String envName = "PrOD";
146 String notifTopic = "notif";
147 String statusTopic = "status";
149 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
150 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
152 Set<String> topics = new HashSet<String>();
153 topics.add(realNotifTopic);
154 topics.add(realStatusTopic);
156 List<String> uebServers = new ArrayList<>();
157 uebServers.add("server1");
158 Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
160 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
162 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
163 deConfiguration.setUebServers(uebServers);
166 deConfiguration.setInitRetryIntervalSec(retry);
167 deConfiguration.setInitMaxIntervalSec(maxRetry);
168 deConfiguration.setDistributionNotifTopicName(notifTopic);
169 deConfiguration.setDistributionStatusTopicName(statusTopic);
170 CreateTopicConfig createTopic = new CreateTopicConfig();
171 createTopic.setPartitionCount(1);
172 createTopic.setReplicationCount(1);
173 deConfiguration.setCreateTopic(createTopic);
175 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
176 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
177 .thenReturn(cambriaErrorResponse);
179 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
180 initTask.setCambriaHandler(cambriaHandler);
183 boolean initFlow = initTask.initFlow();
184 assertTrue("check init flow succeed", initFlow);
185 } catch (Exception e) {
186 assertTrue("Should not throw exception", false);
191 @SuppressWarnings("unchecked")
193 public void testInitFlowScenarioFailToRegister() {
195 String notifTopic = "notif";
196 String statusTopic = "status";
198 List<String> uebServers = new ArrayList<>();
199 uebServers.add("server1");
200 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
201 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
202 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
204 String envName = "PrOD";
206 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
207 deConfiguration.setUebServers(uebServers);
210 deConfiguration.setInitRetryIntervalSec(retry);
211 deConfiguration.setInitMaxIntervalSec(maxRetry);
212 deConfiguration.setDistributionNotifTopicName(notifTopic);
213 deConfiguration.setDistributionStatusTopicName(statusTopic);
214 CreateTopicConfig createTopic = new CreateTopicConfig();
215 createTopic.setPartitionCount(1);
216 createTopic.setReplicationCount(1);
217 deConfiguration.setCreateTopic(createTopic);
219 cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
221 String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
222 String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
223 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
224 when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
226 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
227 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
229 when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
230 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
233 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
234 initTask.setCambriaHandler(cambriaHandler);
236 boolean initFlow = initTask.initFlow();
237 assertFalse("check init flow failed", initFlow);
241 @SuppressWarnings("unchecked")
243 public void testInitFlowScenario1GetTopicsFailed() {
245 List<String> uebServers = new ArrayList<>();
246 uebServers.add("server1");
247 CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
248 Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
249 when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
251 String envName = "PrOD";
253 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
254 deConfiguration.setUebServers(uebServers);
257 deConfiguration.setInitRetryIntervalSec(retry);
258 deConfiguration.setInitMaxIntervalSec(maxRetry);
259 DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
260 initTask.setCambriaHandler(cambriaHandler);
262 boolean initFlow = initTask.initFlow();
263 assertFalse("check init flow failed", initFlow);
267 private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
268 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
269 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
270 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
271 Set<String> puebEndpoints = new HashSet<>();
272 if(distributionEngineConfiguration.getUebServers() != null)
273 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
274 entry.setDmaapUebAddress(puebEndpoints);
275 String envName = "UNKNOWN";
276 if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
277 envName = distributionEngineConfiguration.getEnvironments().get(0);
278 entry.setEnvironmentId(envName);