2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
30 import org.openecomp.sdc.be.config.BeEcompErrorManager;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.att.nsa.cambria.client.CambriaConsumer;
40 import com.google.gson.Gson;
41 import com.google.gson.GsonBuilder;
43 import fj.data.Either;
45 public class DistributionEnginePollingTask implements Runnable {
47 public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
49 private String topicName;
50 private ComponentsUtils componentUtils;
51 private int fetchTimeoutInSec = 15;
52 private int pollingIntervalInSec;
53 private String consumerId;
54 private String consumerGroup;
56 private CambriaHandler cambriaHandler = new CambriaHandler();
57 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
58 private DistributionCompleteReporter distributionCompleteReporter;
60 private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
62 private static final Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class);
64 ScheduledFuture<?> scheduledFuture = null;
65 private CambriaConsumer cambriaConsumer = null;
67 private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
69 private OperationalEnvironmentEntry environmentEntry;
71 public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
73 this.componentUtils = componentUtils;
74 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
75 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
76 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
77 this.consumerGroup = statusConfig.getConsumerGroup();
78 this.consumerId = statusConfig.getConsumerId();
79 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
80 this.environmentEntry = environmentEntry;
81 this.distributionCompleteReporter = distributionCompleteReporter;
84 public void startTask(String topicName) {
86 this.topicName = topicName;
87 logger.debug("start task for polling topic {}", topicName);
88 if (fetchTimeoutInSec < 15) {
89 logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
90 fetchTimeoutInSec = 15;
93 cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
94 fetchTimeoutInSec * 1000);
96 if (scheduledPollingService != null) {
97 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
98 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
101 } catch (Exception e) {
102 logger.debug("unexpected error occured", e);
103 String methodName = new Object() {
104 }.getClass().getEnclosingMethod().getName();
106 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
110 public void stopTask() {
111 if (scheduledFuture != null) {
112 boolean result = scheduledFuture.cancel(true);
113 logger.debug("Stop polling task. result = {}", result);
114 if (false == result) {
115 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
117 scheduledFuture = null;
119 if (cambriaConsumer != null) {
120 logger.debug("close consumer");
121 cambriaHandler.closeConsumer(cambriaConsumer);
126 public void destroy() {
133 logger.trace("run() method. polling queue {}", topicName);
137 if (cambriaConsumer == null) {
138 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
143 Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
145 if (fetchResult.isRight()) {
146 CambriaErrorResponse errorResponse = fetchResult.right().value();
147 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
149 // TODO: if status== internal error (connection problem) change
151 // in next try, if succeed - change to active
156 Iterable<String> messages = fetchResult.left().value();
157 for (String message : messages) {
158 logger.trace("received message {}", message);
160 DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
161 handleDistributionNotificationMsg(notification);
162 distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
163 } catch (Exception e) {
164 logger.debug("failed to convert message to object", e);
165 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
169 } catch (Exception e) {
170 logger.debug("unexpected error occured", e);
171 String methodName = new Object() {
172 }.getClass().getEnclosingMethod().getName();
174 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
179 private void handleDistributionNotificationMsg(DistributionStatusNotification notification) {
180 componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
181 String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
182 if (notification.isDistributionCompleteNotification()) {
183 distributionCompleteReporter.reportDistributionComplete(notification);
187 private void shutdownExecutor() {
188 if (scheduledPollingService == null)
191 scheduledPollingService.shutdown(); // Disable new tasks from being
194 // Wait a while for existing tasks to terminate
195 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
196 scheduledPollingService.shutdownNow(); // Cancel currently
198 // Wait a while for tasks to respond to being cancelled
199 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
200 logger.debug("Pool did not terminate");
202 } catch (InterruptedException ie) {
203 // (Re-)Cancel if current thread also interrupted
204 scheduledPollingService.shutdownNow();
205 // Preserve interrupt status
206 Thread.currentThread().interrupt();