2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.distribution.engine;
22 import com.att.nsa.cambria.client.CambriaConsumer;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
31 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
32 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
33 import org.openecomp.sdc.be.config.BeEcompErrorManager;
34 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
35 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
36 import org.openecomp.sdc.be.impl.ComponentsUtils;
37 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
38 import org.openecomp.sdc.common.log.wrappers.Logger;
39 import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit;
41 public class DistributionEnginePollingTask implements Runnable {
43 public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
44 private static final String PARTNER_NAME = "UNKNOWN";
45 private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
46 private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
47 ScheduledFuture<?> scheduledFuture = null;
48 private String topicName;
49 private ComponentsUtils componentUtils;
50 private int fetchTimeoutInSec = 15;
51 private int pollingIntervalInSec;
52 private String consumerId;
53 private String consumerGroup;
54 private CambriaHandler cambriaHandler = new CambriaHandler();
55 private final KafkaHandler kafkaHandler = new KafkaHandler();
56 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
57 private DistributionCompleteReporter distributionCompleteReporter;
58 private ScheduledExecutorService scheduledPollingService = Executors
59 .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
60 private CambriaConsumer cambriaConsumer = null;
61 private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
62 private OperationalEnvironmentEntry environmentEntry;
64 public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration,
65 DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils,
66 DistributionEngineClusterHealth distributionEngineClusterHealth,
67 OperationalEnvironmentEntry environmentEntry) {
68 this.componentUtils = componentUtils;
69 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
70 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
71 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
72 this.consumerGroup = statusConfig.getConsumerGroup();
73 this.consumerId = statusConfig.getConsumerId();
74 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
75 this.environmentEntry = environmentEntry;
76 this.distributionCompleteReporter = distributionCompleteReporter;
79 public void startTask(String topicName) {
80 this.topicName = topicName;
81 logger.debug("start task for polling topic {}", topicName);
82 if (fetchTimeoutInSec < 15) {
83 logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
84 fetchTimeoutInSec = 15;
87 if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
88 cambriaConsumer = cambriaHandler
89 .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
90 environmentEntry.getUebSecretKey(),
91 consumerId, consumerGroup, fetchTimeoutInSec * 1000);
93 if (scheduledPollingService != null) {
94 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
95 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
97 } catch (Exception e) {
98 logger.debug("unexpected error occured", e);
99 String methodName = Object.class.getEnclosingMethod().getName();
100 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
104 public void stopTask() {
105 if (scheduledFuture != null) {
106 boolean result = scheduledFuture.cancel(true);
107 logger.debug("Stop polling task. result = {}", result);
109 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
111 scheduledFuture = null;
113 if (cambriaConsumer != null) {
114 logger.debug("close consumer");
115 cambriaHandler.closeConsumer(cambriaConsumer);
119 public void destroy() {
126 logger.trace("run() method. polling queue {}", topicName);
127 Either<Iterable<String>, CambriaErrorResponse> fetchResult;
130 if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
131 if (cambriaConsumer == null) {
132 BeEcompErrorManager.getInstance()
133 .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
137 fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
139 fetchResult = kafkaHandler.fetchFromTopic(topicName);
142 if (fetchResult.isRight()) {
143 CambriaErrorResponse errorResponse = fetchResult.right().value();
144 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
145 "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
146 // TODO: if status== internal error (connection problem) change
150 // in next try, if succeed - change to active
154 Iterable<String> messages = fetchResult.left().value();
155 for (String message : messages) {
156 logger.trace("received message {}", message);
158 DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
159 audit.startAuditFetchLog(PARTNER_NAME, DistributionEnginePollingTask.class.getName());
160 handleDistributionNotificationMsg(notification, audit);
161 distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
162 } catch (Exception e) {
163 logger.debug("failed to convert message to object", e);
164 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
165 "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
168 } catch (Exception e) {
169 logger.debug("unexpected error occurred", e);
170 String methodName = Object.class.getEnclosingMethod().getName();
171 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
175 private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) {
176 componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName,
177 notification.getArtifactURL(), String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
178 notification.getErrorReason(), audit);
179 if (notification.isDistributionCompleteNotification()) {
180 distributionCompleteReporter.reportDistributionComplete(notification);
184 private void shutdownExecutor() {
185 if (scheduledPollingService == null) {
188 scheduledPollingService.shutdown(); // Disable new tasks from being
192 // Wait a while for existing tasks to terminate
193 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
194 scheduledPollingService.shutdownNow(); // Cancel currently
198 // Wait a while for tasks to respond to being cancelled
199 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
200 logger.debug("Pool did not terminate");
203 } catch (InterruptedException ie) {
204 // (Re-)Cancel if current thread also interrupted
205 scheduledPollingService.shutdownNow();
206 // Preserve interrupt status
207 Thread.currentThread().interrupt();