52b19673977cd16730807f2e746fa95d654f9c6c
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import fj.data.Either;
24 import org.openecomp.sdc.be.config.BeEcompErrorManager;
25 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
26 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
27 import org.openecomp.sdc.be.impl.ComponentsUtils;
28 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
29 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
30 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
31 import org.openecomp.sdc.common.log.wrappers.Logger;
32
33 import java.util.HashSet;
34 import java.util.Set;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.stream.Collectors;
41
42 public class DistributionEngineInitTask implements Runnable {
43
44     public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
45     public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
46     public static final String CONSUMER = "CONSUMER";
47     public static final String PRODUCER = "PRODUCER";
48     public static final String CREATED = "CREATED";
49     public static final String FAILED = "FAILED";
50     public static final Integer HTTP_OK = 200;
51
52     private Long delayBeforeStartFlow = 0l;
53     private DistributionEngineConfiguration deConfiguration;
54     private String envName;
55     private long retryInterval;
56     private long currentRetryInterval;
57     private long maxInterval;
58     boolean maximumRetryInterval = false;
59     private AtomicBoolean status = null;
60     ComponentsUtils componentsUtils = null;
61     DistributionEnginePollingTask distributionEnginePollingTask = null;
62     private OperationalEnvironmentEntry environmentEntry;
63
64     private CambriaHandler cambriaHandler = new CambriaHandler();
65
66     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
67     private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
68     ScheduledFuture<?> scheduledFuture = null;
69     
70     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
71         super();
72         this.delayBeforeStartFlow = delayBeforeStartFlow;
73         this.deConfiguration = deConfiguration;
74         this.envName = envName;
75         this.retryInterval = deConfiguration.getInitRetryIntervalSec();
76         this.currentRetryInterval = retryInterval;
77         this.maxInterval = deConfiguration.getInitMaxIntervalSec();
78         this.status = status;
79         this.componentsUtils = componentsUtils;
80         this.distributionEnginePollingTask = distributionEnginePollingTask;
81         this.environmentEntry = environmentEntry;
82     }
83
84     public void startTask() {
85         if (scheduledExecutorService != null) {
86             Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
87             logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
88             this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
89
90         }
91     }
92
93     public void restartTask() {
94
95         this.stopTask();
96
97         logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
98
99         long lastCurrentInterval = currentRetryInterval;
100         incrementRetryInterval();
101
102         this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
103
104     }
105
106     protected void incrementRetryInterval() {
107         if (currentRetryInterval < maxInterval) {
108             currentRetryInterval *= 2;
109             if (currentRetryInterval > maxInterval) {
110                 setMaxRetryInterval();
111             }
112         } else {
113             setMaxRetryInterval();
114         }
115     }
116
117     private void setMaxRetryInterval() {
118         currentRetryInterval = maxInterval;
119         maximumRetryInterval = true;
120         logger.debug("Set next retry init interval to {}", maxInterval);
121     }
122
123     public void stopTask() {
124         if (scheduledFuture != null) {
125             boolean result = scheduledFuture.cancel(true);
126             logger.debug("Stop reinit task. result = {}", result);
127             if (!result) {
128                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
129             }
130             scheduledFuture = null;
131         }
132     }
133
134     public void destroy() {
135         this.stopTask();
136         if (scheduledExecutorService != null) {
137             scheduledExecutorService.shutdown();
138         }
139     }
140
141     @Override
142     public void run() {
143
144         boolean result = false;
145         result = initFlow();
146
147         if (result) {
148             this.stopTask();
149             this.status.set(true);
150             if (this.distributionEnginePollingTask != null) {
151                 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
152                 logger.debug("start polling distribution status topic {}", topicName);
153                 this.distributionEnginePollingTask.startTask(topicName);
154             }
155         } else {
156             if (!maximumRetryInterval) {
157                 this.restartTask();
158             }
159         }
160     }
161
162     /**
163      * run initialization flow
164      *
165      * @return
166      */
167     public boolean initFlow() {
168
169         logger.trace("Start init flow for environment {}", this.envName);
170
171         Set<String> topicsList = null;
172         Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
173
174         getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
175         if (getTopicsRes.isRight()) {
176             CambriaErrorResponse status = getTopicsRes.right().value();
177             if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
178                 topicsList = new HashSet<>();
179             } else {
180                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
181                 return false;
182             }
183         } else {
184             topicsList = getTopicsRes.left().value();
185         }
186
187         String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
188         logger.debug("Going to handle topic {}", notificationTopic);
189         if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
190             return false;
191         }
192
193         CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
194
195         CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
196
197         if (createStatus != CambriaOperationStatus.OK) {
198             return false;
199         }
200
201         String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
202         logger.debug("Going to handle topic {}", statusTopic);
203         if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
204             return false;
205         }
206
207         CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
208
209         return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
210     }
211
212     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
213         CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
214
215         String role = CONSUMER;
216         if (subscriberType == SubscriberTypeEnum.PRODUCER) {
217             role = PRODUCER;
218         }
219         auditRegistration(topicName, registerStatus, role);
220         return registerStatus;
221     }
222
223     private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
224         if (componentsUtils != null) {
225             Integer httpCode = registerProducerStatus.getHttpCode();
226             String httpCodeStr = String.valueOf(httpCode);
227             this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
228                     DistributionTopicData.newBuilder()
229                             .notificationTopic(notificationTopic)
230                             .build(),
231                     role, environmentEntry.getUebApikey(), httpCodeStr);
232         }
233     }
234
235     private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
236         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
237                 .statusTopic(topicName)
238                 .build();
239         return createDistributionTopic(topicsList, topicName, distributionTopicData);
240     }
241
242     private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
243
244         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
245                 .notificationTopic(topicName)
246                 .build();
247         return createDistributionTopic(topicsList, topicName, distributionTopicData);
248     }
249
250     private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
251
252         boolean isSucceeded = true;
253
254         if (topicsList.contains(topicName)) {
255             if (componentsUtils != null) {
256                 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
257             }
258             return isSucceeded;
259         }
260         CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
261                 deConfiguration.getCreateTopic().getReplicationCount());
262
263         CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
264         switch (status) {
265             case OK:
266                 if (componentsUtils != null) {
267                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
268                 }
269                 break;
270             case TOPIC_ALREADY_EXIST:
271                 if (componentsUtils != null) {
272                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
273                 }
274                 break;
275             default:
276                 if (componentsUtils != null) {
277                     componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
278                 }
279                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
280                 isSucceeded = false;
281                 break;
282         }
283         return isSucceeded;
284     }
285
286     public static String buildTopicName(String topicName, String environment) {
287         return topicName + "-" + environment.toUpperCase();
288     }
289
290     public boolean isActive() {
291         return this.status.get();
292     }
293
294     public long getCurrentRetryInterval() {
295         return currentRetryInterval;
296     }
297
298     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
299         this.cambriaHandler = cambriaHandler;
300     }
301 }