2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import java.util.HashSet;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
31 import org.openecomp.sdc.be.config.BeEcompErrorManager;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
34 import org.openecomp.sdc.be.impl.ComponentsUtils;
35 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
36 import org.openecomp.sdc.common.config.EcompErrorName;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import fj.data.Either;
42 public class DistributionEngineInitTask implements Runnable {
44 public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
45 public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
46 public static final String CONSUMER = "CONSUMER";
47 public static final String PRODUCER = "PRODUCER";
48 public static final String CREATED = "CREATED";
49 public static final String FAILED = "FAILED";
50 public static final Integer HTTP_OK = 200;
52 private Long delayBeforeStartFlow = 0l;
53 private DistributionEngineConfiguration deConfiguration;
54 private String envName;
55 private long retryInterval;
56 private long currentRetryInterval;
57 private long maxInterval;
58 // private boolean active = false;
59 boolean maximumRetryInterval = false;
60 private AtomicBoolean status = null;
61 ComponentsUtils componentsUtils = null;
62 DistributionEnginePollingTask distributionEnginePollingTask = null;
64 private CambriaHandler cambriaHandler = new CambriaHandler();
66 public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) {
68 this.delayBeforeStartFlow = delayBeforeStartFlow;
69 this.deConfiguration = deConfiguration;
70 this.envName = envName;
71 this.retryInterval = deConfiguration.getInitRetryIntervalSec();
72 this.currentRetryInterval = retryInterval;
73 this.maxInterval = deConfiguration.getInitMaxIntervalSec();
75 this.componentsUtils = componentsUtils;
76 this.distributionEnginePollingTask = distributionEnginePollingTask;
79 private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
81 private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName());
83 ScheduledFuture<?> scheduledFuture = null;
85 public void startTask() {
86 if (scheduledExecutorService != null) {
87 Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
88 logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
89 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
94 public void restartTask() {
98 logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
100 long lastCurrentInterval = currentRetryInterval;
101 incrementRetryInterval();
103 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
107 protected void incrementRetryInterval() {
108 if (currentRetryInterval < maxInterval) {
109 currentRetryInterval *= 2;
110 if (currentRetryInterval > maxInterval) {
111 setMaxRetryInterval();
114 setMaxRetryInterval();
118 private void setMaxRetryInterval() {
119 currentRetryInterval = maxInterval;
120 maximumRetryInterval = true;
121 logger.debug("Set next retry init interval to {}", maxInterval);
124 public void stopTask() {
125 if (scheduledFuture != null) {
126 boolean result = scheduledFuture.cancel(true);
127 logger.debug("Stop reinit task. result = {}", result);
128 if (false == result) {
129 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
130 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
132 scheduledFuture = null;
136 public void destroy() {
138 if (scheduledExecutorService != null) {
139 scheduledExecutorService.shutdown();
146 boolean result = false;
149 if (true == result) {
151 this.status.set(true);
152 if (this.distributionEnginePollingTask != null) {
153 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
154 logger.debug("start polling distribution status topic {}", topicName);
155 this.distributionEnginePollingTask.startTask(topicName);
158 if (false == maximumRetryInterval) {
165 * run initialization flow
169 public boolean initFlow() {
171 logger.trace("Start init flow for environment {}", this.envName);
173 Set<String> topicsList = null;
174 Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
176 getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers());
177 if (getTopicsRes.isRight()) {
178 CambriaErrorResponse status = getTopicsRes.right().value();
179 if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
180 topicsList = new HashSet<>();
182 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
184 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
189 topicsList = getTopicsRes.left().value();
192 String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
193 logger.debug("Going to handle topic {}", notificationTopic);
195 boolean status = createTopicIfNotExists(topicsList, notificationTopic);
196 if (false == status) {
200 CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
202 CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
204 if (createStatus != CambriaOperationStatus.OK) {
208 String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
209 logger.debug("Going to handle topic {}", statusTopic);
210 status = createTopicIfNotExists(topicsList, statusTopic);
211 if (false == status) {
215 CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
217 if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
224 private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
225 CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType);
227 String role = CONSUMER;
228 if (subscriberType == SubscriberTypeEnum.PRODUCER) {
231 auditRegistration(topicName, registerStatus, role);
232 return registerStatus;
235 private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
236 if (componentsUtils != null) {
237 Integer httpCode = registerProducerStatus.getHttpCode();
238 String httpCodeStr = String.valueOf(httpCode);
239 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr);
243 private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
245 if (topicsList.contains(topicName)) {
246 if (componentsUtils != null) {
247 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
252 CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
253 deConfiguration.getCreateTopic().getReplicationCount());
255 CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
256 if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
257 if (componentsUtils != null) {
258 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
260 } else if (status == CambriaOperationStatus.OK) {
261 if (componentsUtils != null) {
262 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
265 if (componentsUtils != null) {
266 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
268 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
270 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
278 public static String buildTopicName(String topicName, String environment) {
279 return topicName + "-" + environment.toUpperCase();
282 public boolean isActive() {
283 return this.status.get();
286 public long getCurrentRetryInterval() {
287 return currentRetryInterval;
290 protected void setCambriaHandler(CambriaHandler cambriaHandler) {
291 this.cambriaHandler = cambriaHandler;