2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.distribution.engine;
22 import com.att.nsa.cambria.client.CambriaConsumer;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
31 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
32 import org.openecomp.sdc.be.config.BeEcompErrorManager;
33 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
34 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
35 import org.openecomp.sdc.be.impl.ComponentsUtils;
36 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
37 import org.openecomp.sdc.common.log.wrappers.Logger;
38 import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit;
40 public class DistributionEnginePollingTask implements Runnable {
42 public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
43 private static final String PARTNER_NAME = "UNKNOWN";
44 private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
45 private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
46 ScheduledFuture<?> scheduledFuture = null;
47 private String topicName;
48 private ComponentsUtils componentUtils;
49 private int fetchTimeoutInSec = 15;
50 private int pollingIntervalInSec;
51 private String consumerId;
52 private String consumerGroup;
53 private CambriaHandler cambriaHandler = new CambriaHandler();
54 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
55 private DistributionCompleteReporter distributionCompleteReporter;
56 private ScheduledExecutorService scheduledPollingService = Executors
57 .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
58 private CambriaConsumer cambriaConsumer = null;
59 private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
60 private OperationalEnvironmentEntry environmentEntry;
62 public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration,
63 DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils,
64 DistributionEngineClusterHealth distributionEngineClusterHealth,
65 OperationalEnvironmentEntry environmentEntry) {
66 this.componentUtils = componentUtils;
67 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
68 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
69 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
70 this.consumerGroup = statusConfig.getConsumerGroup();
71 this.consumerId = statusConfig.getConsumerId();
72 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
73 this.environmentEntry = environmentEntry;
74 this.distributionCompleteReporter = distributionCompleteReporter;
77 public void startTask(String topicName) {
78 this.topicName = topicName;
79 logger.debug("start task for polling topic {}", topicName);
80 if (fetchTimeoutInSec < 15) {
81 logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
82 fetchTimeoutInSec = 15;
85 cambriaConsumer = cambriaHandler
86 .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
87 consumerId, consumerGroup, fetchTimeoutInSec * 1000);
88 if (scheduledPollingService != null) {
89 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
90 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
92 } catch (Exception e) {
93 logger.debug("unexpected error occured", e);
94 String methodName = Object.class.getEnclosingMethod().getName();
95 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
99 public void stopTask() {
100 if (scheduledFuture != null) {
101 boolean result = scheduledFuture.cancel(true);
102 logger.debug("Stop polling task. result = {}", result);
104 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
106 scheduledFuture = null;
108 if (cambriaConsumer != null) {
109 logger.debug("close consumer");
110 cambriaHandler.closeConsumer(cambriaConsumer);
114 public void destroy() {
121 logger.trace("run() method. polling queue {}", topicName);
124 if (cambriaConsumer == null) {
125 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
129 Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
131 if (fetchResult.isRight()) {
132 CambriaErrorResponse errorResponse = fetchResult.right().value();
133 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
134 "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
135 // TODO: if status== internal error (connection problem) change
139 // in next try, if succeed - change to active
143 Iterable<String> messages = fetchResult.left().value();
144 for (String message : messages) {
145 logger.trace("received message {}", message);
147 DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
148 audit.startAuditFetchLog(PARTNER_NAME, DistributionEnginePollingTask.class.getName());
149 handleDistributionNotificationMsg(notification, audit);
150 distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
151 } catch (Exception e) {
152 logger.debug("failed to convert message to object", e);
153 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
154 "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
157 } catch (Exception e) {
158 logger.debug("unexpected error occurred", e);
159 String methodName = Object.class.getEnclosingMethod().getName();
160 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
164 private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) {
165 componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName,
166 notification.getArtifactURL(), String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
167 notification.getErrorReason(), audit);
168 if (notification.isDistributionCompleteNotification()) {
169 distributionCompleteReporter.reportDistributionComplete(notification);
173 private void shutdownExecutor() {
174 if (scheduledPollingService == null) {
177 scheduledPollingService.shutdown(); // Disable new tasks from being
181 // Wait a while for existing tasks to terminate
182 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
183 scheduledPollingService.shutdownNow(); // Cancel currently
187 // Wait a while for tasks to respond to being cancelled
188 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
189 logger.debug("Pool did not terminate");
192 } catch (InterruptedException ie) {
193 // (Re-)Cancel if current thread also interrupted
194 scheduledPollingService.shutdownNow();
195 // Preserve interrupt status
196 Thread.currentThread().interrupt();