043efdf81d3fc5d59e5e862584fafcada6326384
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.stream.Collectors;
31
32 import org.openecomp.sdc.be.config.BeEcompErrorManager;
33 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
34 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
35 import org.openecomp.sdc.be.impl.ComponentsUtils;
36 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
37 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import fj.data.Either;
42
43 public class DistributionEngineInitTask implements Runnable {
44
45     public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
46     public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
47     public static final String CONSUMER = "CONSUMER";
48     public static final String PRODUCER = "PRODUCER";
49     public static final String CREATED = "CREATED";
50     public static final String FAILED = "FAILED";
51     public static final Integer HTTP_OK = 200;
52
53     private Long delayBeforeStartFlow = 0l;
54     private DistributionEngineConfiguration deConfiguration;
55     private String envName;
56     private long retryInterval;
57     private long currentRetryInterval;
58     private long maxInterval;
59     boolean maximumRetryInterval = false;
60     private AtomicBoolean status = null;
61     ComponentsUtils componentsUtils = null;
62     DistributionEnginePollingTask distributionEnginePollingTask = null;
63     private OperationalEnvironmentEntry environmentEntry;
64
65     private CambriaHandler cambriaHandler = new CambriaHandler();
66
67     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
68         super();
69         this.delayBeforeStartFlow = delayBeforeStartFlow;
70         this.deConfiguration = deConfiguration;
71         this.envName = envName;
72         this.retryInterval = deConfiguration.getInitRetryIntervalSec();
73         this.currentRetryInterval = retryInterval;
74         this.maxInterval = deConfiguration.getInitMaxIntervalSec();
75         this.status = status;
76         this.componentsUtils = componentsUtils;
77         this.distributionEnginePollingTask = distributionEnginePollingTask;
78         this.environmentEntry = environmentEntry;
79     }
80
81     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
82
83     private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class);
84
85     ScheduledFuture<?> scheduledFuture = null;
86
87     public void startTask() {
88         if (scheduledExecutorService != null) {
89             Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
90             logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
91             this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
92
93         }
94     }
95
96     public void restartTask() {
97
98         this.stopTask();
99
100         logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
101
102         long lastCurrentInterval = currentRetryInterval;
103         incrementRetryInterval();
104
105         this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
106
107     }
108
109     protected void incrementRetryInterval() {
110         if (currentRetryInterval < maxInterval) {
111             currentRetryInterval *= 2;
112             if (currentRetryInterval > maxInterval) {
113                 setMaxRetryInterval();
114             }
115         } else {
116             setMaxRetryInterval();
117         }
118     }
119
120     private void setMaxRetryInterval() {
121         currentRetryInterval = maxInterval;
122         maximumRetryInterval = true;
123         logger.debug("Set next retry init interval to {}", maxInterval);
124     }
125
126     public void stopTask() {
127         if (scheduledFuture != null) {
128             boolean result = scheduledFuture.cancel(true);
129             logger.debug("Stop reinit task. result = {}", result);
130             if (false == result) {
131                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
132             }
133             scheduledFuture = null;
134         }
135     }
136
137     public void destroy() {
138         this.stopTask();
139         if (scheduledExecutorService != null) {
140             scheduledExecutorService.shutdown();
141         }
142     }
143
144     @Override
145     public void run() {
146
147         boolean result = false;
148         result = initFlow();
149
150         if (true == result) {
151             this.stopTask();
152             this.status.set(true);
153             if (this.distributionEnginePollingTask != null) {
154                 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
155                 logger.debug("start polling distribution status topic {}", topicName);
156                 this.distributionEnginePollingTask.startTask(topicName);
157             }
158         } else {
159             if (false == maximumRetryInterval) {
160                 this.restartTask();
161             }
162         }
163     }
164
165     /**
166      * run initialization flow
167      *
168      * @return
169      */
170     public boolean initFlow() {
171
172         logger.trace("Start init flow for environment {}", this.envName);
173
174         Set<String> topicsList = null;
175         Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
176
177         getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
178         if (getTopicsRes.isRight()) {
179             CambriaErrorResponse status = getTopicsRes.right().value();
180             if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
181                 topicsList = new HashSet<>();
182             } else {
183                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
184                 return false;
185             }
186         } else {
187             topicsList = getTopicsRes.left().value();
188         }
189
190         String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
191         logger.debug("Going to handle topic {}", notificationTopic);
192
193         boolean status = createTopicIfNotExists(topicsList, notificationTopic);
194         if (false == status) {
195             return false;
196         }
197
198         CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
199
200         CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
201
202         if (createStatus != CambriaOperationStatus.OK) {
203             return false;
204         }
205
206         String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
207         logger.debug("Going to handle topic {}", statusTopic);
208         status = createTopicIfNotExists(topicsList, statusTopic);
209         if (false == status) {
210             return false;
211         }
212
213         CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
214
215         if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
216             return false;
217         }
218
219         return true;
220     }
221
222     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
223         CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
224
225         String role = CONSUMER;
226         if (subscriberType == SubscriberTypeEnum.PRODUCER) {
227             role = PRODUCER;
228         }
229         auditRegistration(topicName, registerStatus, role);
230         return registerStatus;
231     }
232
233     private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
234         if (componentsUtils != null) {
235             Integer httpCode = registerProducerStatus.getHttpCode();
236             String httpCodeStr = String.valueOf(httpCode);
237             this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr);
238         }
239     }
240
241     private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
242
243         if (topicsList.contains(topicName)) {
244             if (componentsUtils != null) {
245                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
246             }
247             return true;
248         }
249
250         CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
251                 deConfiguration.getCreateTopic().getReplicationCount());
252
253         CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
254         if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
255             if (componentsUtils != null) {
256                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
257             }
258         } else if (status == CambriaOperationStatus.OK) {
259             if (componentsUtils != null) {
260                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
261             }
262         } else {
263             if (componentsUtils != null) {
264                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
265             }
266             BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
267             return false;
268         }
269
270         return true;
271     }
272
273     public static String buildTopicName(String topicName, String environment) {
274         return topicName + "-" + environment.toUpperCase();
275     }
276
277     public boolean isActive() {
278         return this.status.get();
279     }
280
281     public long getCurrentRetryInterval() {
282         return currentRetryInterval;
283     }
284
285     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
286         this.cambriaHandler = cambriaHandler;
287     }
288 }