2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.distribution.engine;
22 import fj.data.Either;
23 import java.util.HashSet;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.stream.Collectors;
31 import org.openecomp.sdc.be.config.BeEcompErrorManager;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
34 import org.openecomp.sdc.be.impl.ComponentsUtils;
35 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
36 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
37 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
38 import org.openecomp.sdc.common.log.wrappers.Logger;
40 public class DistributionEngineInitTask implements Runnable {
42 public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
43 public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
44 public static final String CONSUMER = "CONSUMER";
45 public static final String PRODUCER = "PRODUCER";
46 public static final String CREATED = "CREATED";
47 public static final String FAILED = "FAILED";
48 public static final Integer HTTP_OK = 200;
49 private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
50 boolean maximumRetryInterval = false;
51 ComponentsUtils componentsUtils = null;
52 DistributionEnginePollingTask distributionEnginePollingTask = null;
53 ScheduledFuture<?> scheduledFuture = null;
54 private Long delayBeforeStartFlow = 0l;
55 private DistributionEngineConfiguration deConfiguration;
56 private String envName;
57 private long retryInterval;
58 private long currentRetryInterval;
59 private long maxInterval;
60 private AtomicBoolean status = null;
61 private OperationalEnvironmentEntry environmentEntry;
62 private CambriaHandler cambriaHandler = new CambriaHandler();
63 private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
65 public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
66 AtomicBoolean status, ComponentsUtils componentsUtils,
67 DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
69 this.delayBeforeStartFlow = delayBeforeStartFlow;
70 this.deConfiguration = deConfiguration;
71 this.envName = envName;
72 this.retryInterval = deConfiguration.getInitRetryIntervalSec();
73 this.currentRetryInterval = retryInterval;
74 this.maxInterval = deConfiguration.getInitMaxIntervalSec();
76 this.componentsUtils = componentsUtils;
77 this.distributionEnginePollingTask = distributionEnginePollingTask;
78 this.environmentEntry = environmentEntry;
81 public static String buildTopicName(String topicName, String environment) {
82 return topicName + "-" + environment.toUpperCase();
85 public void startTask() {
86 if (scheduledExecutorService != null) {
87 Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
88 logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval,
89 delayBeforeStartFlow);
90 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
94 public void restartTask() {
96 logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
97 long lastCurrentInterval = currentRetryInterval;
98 incrementRetryInterval();
99 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
102 protected void incrementRetryInterval() {
103 if (currentRetryInterval < maxInterval) {
104 currentRetryInterval *= 2;
105 if (currentRetryInterval > maxInterval) {
106 setMaxRetryInterval();
109 setMaxRetryInterval();
113 private void setMaxRetryInterval() {
114 currentRetryInterval = maxInterval;
115 maximumRetryInterval = true;
116 logger.debug("Set next retry init interval to {}", maxInterval);
119 public void stopTask() {
120 if (scheduledFuture != null) {
121 boolean result = scheduledFuture.cancel(true);
122 logger.debug("Stop reinit task. result = {}", result);
124 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
126 scheduledFuture = null;
130 public void destroy() {
132 if (scheduledExecutorService != null) {
133 scheduledExecutorService.shutdown();
139 boolean result = false;
143 this.status.set(true);
144 if (this.distributionEnginePollingTask != null) {
145 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
146 logger.debug("start polling distribution status topic {}", topicName);
147 this.distributionEnginePollingTask.startTask(topicName);
150 if (!maximumRetryInterval) {
157 * run initialization flow
161 public boolean initFlow() {
162 logger.trace("Start init flow for environment {}", this.envName);
163 Set<String> topicsList = null;
164 Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
165 getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
166 if (getTopicsRes.isRight()) {
167 CambriaErrorResponse status = getTopicsRes.right().value();
168 if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
169 topicsList = new HashSet<>();
171 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
175 topicsList = getTopicsRes.left().value();
177 String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
178 logger.debug("Going to handle topic {}", notificationTopic);
179 if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
182 CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
183 CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
184 if (createStatus != CambriaOperationStatus.OK) {
187 String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
188 logger.debug("Going to handle topic {}", statusTopic);
189 if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
192 CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
193 return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
196 private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
197 CambriaErrorResponse registerStatus = cambriaHandler
198 .registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
199 environmentEntry.getUebApikey(), subscriberType, topicName);
201 if (CambriaOperationStatus.AUTHENTICATION_ERROR.equals(registerStatus.getOperationStatus())
202 || CambriaOperationStatus.CONNNECTION_ERROR.equals(registerStatus.getOperationStatus())){
203 registerStatus = cambriaHandler
204 .registerToTopic(environmentEntry.getDmaapUebAddress(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(),
205 environmentEntry.getUebApikey(), subscriberType, topicName);
208 String role = CONSUMER;
209 if (subscriberType == SubscriberTypeEnum.PRODUCER) {
212 auditRegistration(topicName, registerStatus, role);
213 return registerStatus;
216 private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
217 if (componentsUtils != null) {
218 Integer httpCode = registerProducerStatus.getHttpCode();
219 String httpCodeStr = String.valueOf(httpCode);
220 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
221 DistributionTopicData.newBuilder().notificationTopic(notificationTopic).build(), role, environmentEntry.getUebApikey(), httpCodeStr);
225 private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
226 DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().statusTopic(topicName).build();
227 return createDistributionTopic(topicsList, topicName, distributionTopicData);
230 private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
231 DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().notificationTopic(topicName).build();
232 return createDistributionTopic(topicsList, topicName, distributionTopicData);
235 private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
236 boolean isSucceeded = true;
237 if (topicsList.contains(topicName)) {
238 if (componentsUtils != null) {
240 .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
244 CambriaErrorResponse createDistribTopicStatus = cambriaHandler
245 .createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName,
246 deConfiguration.getCreateTopic().getPartitionCount(), deConfiguration.getCreateTopic().getReplicationCount());
247 CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
250 if (componentsUtils != null) {
252 .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
255 case TOPIC_ALREADY_EXIST:
256 if (componentsUtils != null) {
258 .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
262 if (componentsUtils != null) {
264 .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
266 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
273 public boolean isActive() {
274 return this.status.get();
277 public long getCurrentRetryInterval() {
278 return currentRetryInterval;
281 protected void setCambriaHandler(CambriaHandler cambriaHandler) {
282 this.cambriaHandler = cambriaHandler;