re base code
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEngineInitTask.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import fj.data.Either;
24 import org.openecomp.sdc.be.config.BeEcompErrorManager;
25 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
26 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
27 import org.openecomp.sdc.be.impl.ComponentsUtils;
28 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
29 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
30 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
31 import org.openecomp.sdc.common.log.wrappers.Logger;
32
33 import java.util.HashSet;
34 import java.util.Set;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.stream.Collectors;
41
42 public class DistributionEngineInitTask implements Runnable {
43
44     public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
45     public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
46     public static final String CONSUMER = "CONSUMER";
47     public static final String PRODUCER = "PRODUCER";
48     public static final String CREATED = "CREATED";
49     public static final String FAILED = "FAILED";
50     public static final Integer HTTP_OK = 200;
51
52     private Long delayBeforeStartFlow = 0l;
53     private DistributionEngineConfiguration deConfiguration;
54     private String envName;
55     private long retryInterval;
56     private long currentRetryInterval;
57     private long maxInterval;
58     boolean maximumRetryInterval = false;
59     private AtomicBoolean status = null;
60     ComponentsUtils componentsUtils = null;
61     DistributionEnginePollingTask distributionEnginePollingTask = null;
62     private OperationalEnvironmentEntry environmentEntry;
63
64     private CambriaHandler cambriaHandler = new CambriaHandler();
65
66     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
67         super();
68         this.delayBeforeStartFlow = delayBeforeStartFlow;
69         this.deConfiguration = deConfiguration;
70         this.envName = envName;
71         this.retryInterval = deConfiguration.getInitRetryIntervalSec();
72         this.currentRetryInterval = retryInterval;
73         this.maxInterval = deConfiguration.getInitMaxIntervalSec();
74         this.status = status;
75         this.componentsUtils = componentsUtils;
76         this.distributionEnginePollingTask = distributionEnginePollingTask;
77         this.environmentEntry = environmentEntry;
78     }
79
80     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
81
82     private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
83
84     ScheduledFuture<?> scheduledFuture = null;
85
86     public void startTask() {
87         if (scheduledExecutorService != null) {
88             Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
89             logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
90             this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
91
92         }
93     }
94
95     public void restartTask() {
96
97         this.stopTask();
98
99         logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
100
101         long lastCurrentInterval = currentRetryInterval;
102         incrementRetryInterval();
103
104         this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
105
106     }
107
108     protected void incrementRetryInterval() {
109         if (currentRetryInterval < maxInterval) {
110             currentRetryInterval *= 2;
111             if (currentRetryInterval > maxInterval) {
112                 setMaxRetryInterval();
113             }
114         } else {
115             setMaxRetryInterval();
116         }
117     }
118
119     private void setMaxRetryInterval() {
120         currentRetryInterval = maxInterval;
121         maximumRetryInterval = true;
122         logger.debug("Set next retry init interval to {}", maxInterval);
123     }
124
125     public void stopTask() {
126         if (scheduledFuture != null) {
127             boolean result = scheduledFuture.cancel(true);
128             logger.debug("Stop reinit task. result = {}", result);
129             if (!result) {
130                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
131             }
132             scheduledFuture = null;
133         }
134     }
135
136     public void destroy() {
137         this.stopTask();
138         if (scheduledExecutorService != null) {
139             scheduledExecutorService.shutdown();
140         }
141     }
142
143     @Override
144     public void run() {
145
146         boolean result = false;
147         result = initFlow();
148
149         if (result) {
150             this.stopTask();
151             this.status.set(true);
152             if (this.distributionEnginePollingTask != null) {
153                 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
154                 logger.debug("start polling distribution status topic {}", topicName);
155                 this.distributionEnginePollingTask.startTask(topicName);
156             }
157         } else {
158             if (!maximumRetryInterval) {
159                 this.restartTask();
160             }
161         }
162     }
163
164     /**
165      * run initialization flow
166      *
167      * @return
168      */
169     public boolean initFlow() {
170
171         logger.trace("Start init flow for environment {}", this.envName);
172
173         Set<String> topicsList = null;
174         Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
175
176         getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
177         if (getTopicsRes.isRight()) {
178             CambriaErrorResponse status = getTopicsRes.right().value();
179             if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
180                 topicsList = new HashSet<>();
181             } else {
182                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
183                 return false;
184             }
185         } else {
186             topicsList = getTopicsRes.left().value();
187         }
188
189         String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
190         logger.debug("Going to handle topic {}", notificationTopic);
191         if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
192             return false;
193         }
194
195         CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
196
197         CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
198
199         if (createStatus != CambriaOperationStatus.OK) {
200             return false;
201         }
202
203         String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
204         logger.debug("Going to handle topic {}", statusTopic);
205         if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
206             return false;
207         }
208
209         CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
210
211         if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
212             return false;
213         }
214
215         return true;
216     }
217
218     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
219         CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
220
221         String role = CONSUMER;
222         if (subscriberType == SubscriberTypeEnum.PRODUCER) {
223             role = PRODUCER;
224         }
225         auditRegistration(topicName, registerStatus, role);
226         return registerStatus;
227     }
228
229     private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
230         if (componentsUtils != null) {
231             Integer httpCode = registerProducerStatus.getHttpCode();
232             String httpCodeStr = String.valueOf(httpCode);
233             this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
234                     DistributionTopicData.newBuilder()
235                             .notificationTopic(notificationTopic)
236                             .build(),
237                     role, environmentEntry.getUebApikey(), httpCodeStr);
238         }
239     }
240
241     private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
242         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
243                 .statusTopic(topicName)
244                 .build();
245         return createDistributionTopic(topicsList, topicName, distributionTopicData);
246     }
247
248     private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
249
250         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
251                 .notificationTopic(topicName)
252                 .build();
253         return createDistributionTopic(topicsList, topicName, distributionTopicData);
254     }
255
256     private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
257
258         boolean isSucceeded = true;
259
260         if (topicsList.contains(topicName)) {
261             if (componentsUtils != null) {
262                 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
263             }
264             return isSucceeded;
265         }
266         CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
267                 deConfiguration.getCreateTopic().getReplicationCount());
268
269         CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
270         switch (status) {
271             case OK:
272                 if (componentsUtils != null) {
273                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
274                 }
275                 break;
276             case TOPIC_ALREADY_EXIST:
277                 if (componentsUtils != null) {
278                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
279                 }
280                 break;
281             default:
282                 if (componentsUtils != null) {
283                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
284                 }
285                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
286                 isSucceeded = false;
287                 break;
288         }
289         return isSucceeded;
290     }
291
292     public static String buildTopicName(String topicName, String environment) {
293         return topicName + "-" + environment.toUpperCase();
294     }
295
296     public boolean isActive() {
297         return this.status.get();
298     }
299
300     public long getCurrentRetryInterval() {
301         return currentRetryInterval;
302     }
303
304     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
305         this.cambriaHandler = cambriaHandler;
306     }
307 }