Sync Integ to Master
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEngineInitTaskTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import fj.data.Either;
24 import org.apache.commons.collections.CollectionUtils;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.Mockito;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
32 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.api.ConfigurationSource;
36 import org.openecomp.sdc.common.impl.ExternalConfiguration;
37 import org.openecomp.sdc.common.impl.FSConfigurationSource;
38
39 import java.io.File;
40 import java.util.ArrayList;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.when;
50
51 public class DistributionEngineInitTaskTest {
52
53     @Mock
54     private ComponentsUtils componentsUtils;
55
56     @Mock
57     private CambriaHandler cambriaHandler;
58
59     @Before
60     public void setup() {
61         ExternalConfiguration.setAppName("catalog-be");
62         ExternalConfiguration.setConfigDir("src/test/resources/config");
63         ExternalConfiguration.listenForChanges();
64
65         ConfigurationSource configurationSource = new FSConfigurationSource(ExternalConfiguration.getChangeListener(), ExternalConfiguration.getConfigDir() + File.separator + ExternalConfiguration.getAppName());
66         new ConfigurationManager(configurationSource);
67
68         componentsUtils = Mockito.mock(ComponentsUtils.class);
69         cambriaHandler = Mockito.mock(CambriaHandler.class);
70     }
71
72     @Test
73     public void checkIncrement() {
74
75         String envName = "PrOD";
76
77         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
78         int retry = 2;
79         int maxRetry = 40;
80         deConfiguration.setInitRetryIntervalSec(retry);
81         deConfiguration.setInitMaxIntervalSec(maxRetry);
82         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
83
84         for (int i = 1; i < 5; i++) {
85             initTask.incrementRetryInterval();
86             assertEquals("check next retry interval", initTask.getCurrentRetryInterval(), retry * (long) Math.pow(2, i));
87         }
88
89         initTask.incrementRetryInterval();
90         assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
91
92     }
93
94     @SuppressWarnings("unchecked")
95     @Test
96     public void testInitFlowScenarioSuccess() {
97
98         String notifTopic = "notif";
99         String statusTopic = "status";
100
101         List<String> uebServers = new ArrayList<>();
102         uebServers.add("server1");
103         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
104         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
105         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
106
107         String envName = "PrOD";
108
109         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
110         deConfiguration.setUebServers(uebServers);
111         int retry = 2;
112         int maxRetry = 40;
113         deConfiguration.setInitRetryIntervalSec(retry);
114         deConfiguration.setInitMaxIntervalSec(maxRetry);
115         deConfiguration.setDistributionNotifTopicName(notifTopic);
116         deConfiguration.setDistributionStatusTopicName(statusTopic);
117         CreateTopicConfig createTopic = new CreateTopicConfig();
118         createTopic.setPartitionCount(1);
119         createTopic.setReplicationCount(1);
120         deConfiguration.setCreateTopic(createTopic);
121
122         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
123
124         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
125         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
126         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
127         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
128
129         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
130         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
131                 .thenReturn(cambriaErrorResponse);
132
133         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
134         initTask.setCambriaHandler(cambriaHandler);
135
136         boolean initFlow = initTask.initFlow();
137         assertTrue("check init flow succeed", initFlow);
138
139     }
140
141     @SuppressWarnings("unchecked")
142     @Test
143     public void testInitFlowScenarioSuccessTopicsAlreadyExists() {
144
145         String envName = "PrOD";
146         String notifTopic = "notif";
147         String statusTopic = "status";
148
149         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
150         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
151
152         Set<String> topics = new HashSet<String>();
153         topics.add(realNotifTopic);
154         topics.add(realStatusTopic);
155
156         List<String> uebServers = new ArrayList<>();
157         uebServers.add("server1");
158         Either<Set<String>, CambriaErrorResponse> left = Either.left(topics);
159
160         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(left);
161
162         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
163         deConfiguration.setUebServers(uebServers);
164         int retry = 2;
165         int maxRetry = 40;
166         deConfiguration.setInitRetryIntervalSec(retry);
167         deConfiguration.setInitMaxIntervalSec(maxRetry);
168         deConfiguration.setDistributionNotifTopicName(notifTopic);
169         deConfiguration.setDistributionStatusTopicName(statusTopic);
170         CreateTopicConfig createTopic = new CreateTopicConfig();
171         createTopic.setPartitionCount(1);
172         createTopic.setReplicationCount(1);
173         deConfiguration.setCreateTopic(createTopic);
174
175         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
176         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
177                 .thenReturn(cambriaErrorResponse);
178
179         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
180         initTask.setCambriaHandler(cambriaHandler);
181
182         try {
183             boolean initFlow = initTask.initFlow();
184             assertTrue("check init flow succeed", initFlow);
185         } catch (Exception e) {
186             assertTrue("Should not throw exception", false);
187         }
188
189     }
190
191     @SuppressWarnings("unchecked")
192     @Test
193     public void testInitFlowScenarioFailToRegister() {
194
195         String notifTopic = "notif";
196         String statusTopic = "status";
197
198         List<String> uebServers = new ArrayList<>();
199         uebServers.add("server1");
200         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.NOT_FOUND);
201         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
202         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
203
204         String envName = "PrOD";
205
206         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
207         deConfiguration.setUebServers(uebServers);
208         int retry = 2;
209         int maxRetry = 40;
210         deConfiguration.setInitRetryIntervalSec(retry);
211         deConfiguration.setInitMaxIntervalSec(maxRetry);
212         deConfiguration.setDistributionNotifTopicName(notifTopic);
213         deConfiguration.setDistributionStatusTopicName(statusTopic);
214         CreateTopicConfig createTopic = new CreateTopicConfig();
215         createTopic.setPartitionCount(1);
216         createTopic.setReplicationCount(1);
217         deConfiguration.setCreateTopic(createTopic);
218
219         cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.OK);
220
221         String realNotifTopic = notifTopic + "-" + envName.toUpperCase();
222         String realStatusTopic = statusTopic + "-" + envName.toUpperCase();
223         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
224         when(cambriaHandler.createTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic), Mockito.eq(1), Mockito.eq(1))).thenReturn(cambriaErrorResponse);
225
226         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realNotifTopic)))
227                 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK));
228
229         when(cambriaHandler.registerToTopic(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(realStatusTopic)))
230                 .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR));
231
232
233         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
234         initTask.setCambriaHandler(cambriaHandler);
235
236         boolean initFlow = initTask.initFlow();
237         assertFalse("check init flow failed", initFlow);
238
239     }
240
241     @SuppressWarnings("unchecked")
242     @Test
243     public void testInitFlowScenario1GetTopicsFailed() {
244
245         List<String> uebServers = new ArrayList<>();
246         uebServers.add("server1");
247         CambriaErrorResponse cambriaErrorResponse = new CambriaErrorResponse(CambriaOperationStatus.CONNNECTION_ERROR);
248         Either<Set<String>, CambriaErrorResponse> right = Either.right(cambriaErrorResponse);
249         when(cambriaHandler.getTopics(Mockito.any(List.class))).thenReturn(right);
250
251         String envName = "PrOD";
252
253         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
254         deConfiguration.setUebServers(uebServers);
255         int retry = 2;
256         int maxRetry = 40;
257         deConfiguration.setInitRetryIntervalSec(retry);
258         deConfiguration.setInitMaxIntervalSec(maxRetry);
259         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
260         initTask.setCambriaHandler(cambriaHandler);
261
262         boolean initFlow = initTask.initFlow();
263         assertFalse("check init flow failed", initFlow);
264
265     }
266
267     private OperationalEnvironmentEntry readEnvFromConfig(DistributionEngineConfiguration distributionEngineConfiguration) {
268         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
269         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
270         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
271         Set<String> puebEndpoints = new HashSet<>();
272         if(distributionEngineConfiguration.getUebServers() != null)
273             puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
274         entry.setDmaapUebAddress(puebEndpoints);
275         String envName = "UNKNOWN";
276         if(CollectionUtils.isNotEmpty(distributionEngineConfiguration.getEnvironments()))
277             envName = distributionEngineConfiguration.getEnvironments().get(0);
278         entry.setEnvironmentId(envName);
279         return entry;
280     }
281
282 }