2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import fj.data.Either;
24 import org.openecomp.sdc.be.config.BeEcompErrorManager;
25 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
26 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
27 import org.openecomp.sdc.be.impl.ComponentsUtils;
28 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
29 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
30 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
31 import org.openecomp.sdc.common.log.wrappers.Logger;
33 import java.util.HashSet;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.stream.Collectors;
42 public class DistributionEngineInitTask implements Runnable {
44 public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
45 public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
46 public static final String CONSUMER = "CONSUMER";
47 public static final String PRODUCER = "PRODUCER";
48 public static final String CREATED = "CREATED";
49 public static final String FAILED = "FAILED";
50 public static final Integer HTTP_OK = 200;
52 private Long delayBeforeStartFlow = 0l;
53 private DistributionEngineConfiguration deConfiguration;
54 private String envName;
55 private long retryInterval;
56 private long currentRetryInterval;
57 private long maxInterval;
58 boolean maximumRetryInterval = false;
59 private AtomicBoolean status = null;
60 ComponentsUtils componentsUtils = null;
61 DistributionEnginePollingTask distributionEnginePollingTask = null;
62 private OperationalEnvironmentEntry environmentEntry;
64 private CambriaHandler cambriaHandler = new CambriaHandler();
66 private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
67 private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
68 ScheduledFuture<?> scheduledFuture = null;
70 public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
72 this.delayBeforeStartFlow = delayBeforeStartFlow;
73 this.deConfiguration = deConfiguration;
74 this.envName = envName;
75 this.retryInterval = deConfiguration.getInitRetryIntervalSec();
76 this.currentRetryInterval = retryInterval;
77 this.maxInterval = deConfiguration.getInitMaxIntervalSec();
79 this.componentsUtils = componentsUtils;
80 this.distributionEnginePollingTask = distributionEnginePollingTask;
81 this.environmentEntry = environmentEntry;
84 public void startTask() {
85 if (scheduledExecutorService != null) {
86 Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
87 logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
88 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
93 public void restartTask() {
97 logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
99 long lastCurrentInterval = currentRetryInterval;
100 incrementRetryInterval();
102 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
106 protected void incrementRetryInterval() {
107 if (currentRetryInterval < maxInterval) {
108 currentRetryInterval *= 2;
109 if (currentRetryInterval > maxInterval) {
110 setMaxRetryInterval();
113 setMaxRetryInterval();
117 private void setMaxRetryInterval() {
118 currentRetryInterval = maxInterval;
119 maximumRetryInterval = true;
120 logger.debug("Set next retry init interval to {}", maxInterval);
123 public void stopTask() {
124 if (scheduledFuture != null) {
125 boolean result = scheduledFuture.cancel(true);
126 logger.debug("Stop reinit task. result = {}", result);
128 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
130 scheduledFuture = null;
134 public void destroy() {
136 if (scheduledExecutorService != null) {
137 scheduledExecutorService.shutdown();
144 boolean result = false;
149 this.status.set(true);
150 if (this.distributionEnginePollingTask != null) {
151 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
152 logger.debug("start polling distribution status topic {}", topicName);
153 this.distributionEnginePollingTask.startTask(topicName);
156 if (!maximumRetryInterval) {
163 * run initialization flow
167 public boolean initFlow() {
169 logger.trace("Start init flow for environment {}", this.envName);
171 Set<String> topicsList = null;
172 Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
174 getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
175 if (getTopicsRes.isRight()) {
176 CambriaErrorResponse status = getTopicsRes.right().value();
177 if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
178 topicsList = new HashSet<>();
180 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
184 topicsList = getTopicsRes.left().value();
187 String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
188 logger.debug("Going to handle topic {}", notificationTopic);
189 if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
193 CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
195 CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
197 if (createStatus != CambriaOperationStatus.OK) {
201 String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
202 logger.debug("Going to handle topic {}", statusTopic);
203 if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
207 CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
209 return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
212 private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
213 CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
215 String role = CONSUMER;
216 if (subscriberType == SubscriberTypeEnum.PRODUCER) {
219 auditRegistration(topicName, registerStatus, role);
220 return registerStatus;
223 private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
224 if (componentsUtils != null) {
225 Integer httpCode = registerProducerStatus.getHttpCode();
226 String httpCodeStr = String.valueOf(httpCode);
227 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
228 DistributionTopicData.newBuilder()
229 .notificationTopic(notificationTopic)
231 role, environmentEntry.getUebApikey(), httpCodeStr);
235 private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
236 DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
237 .statusTopic(topicName)
239 return createDistributionTopic(topicsList, topicName, distributionTopicData);
242 private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
244 DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder()
245 .notificationTopic(topicName)
247 return createDistributionTopic(topicsList, topicName, distributionTopicData);
250 private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
252 boolean isSucceeded = true;
254 if (topicsList.contains(topicName)) {
255 if (componentsUtils != null) {
256 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
260 CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
261 deConfiguration.getCreateTopic().getReplicationCount());
263 CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
266 if (componentsUtils != null) {
267 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
270 case TOPIC_ALREADY_EXIST:
271 if (componentsUtils != null) {
272 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
276 if (componentsUtils != null) {
277 componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
279 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
286 public static String buildTopicName(String topicName, String environment) {
287 return topicName + "-" + environment.toUpperCase();
290 public boolean isActive() {
291 return this.status.get();
294 public long getCurrentRetryInterval() {
295 return currentRetryInterval;
298 protected void setCambriaHandler(CambriaHandler cambriaHandler) {
299 this.cambriaHandler = cambriaHandler;