2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import com.att.nsa.cambria.client.CambriaConsumer;
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import fj.data.Either;
27 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
28 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
29 import org.openecomp.sdc.be.config.BeEcompErrorManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
34 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
41 public class DistributionEnginePollingTask implements Runnable {
43 public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
45 private String topicName;
46 private ComponentsUtils componentUtils;
47 private int fetchTimeoutInSec = 15;
48 private int pollingIntervalInSec;
49 private String consumerId;
50 private String consumerGroup;
52 private CambriaHandler cambriaHandler = new CambriaHandler();
53 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
54 private DistributionCompleteReporter distributionCompleteReporter;
56 private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
58 private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
60 ScheduledFuture<?> scheduledFuture = null;
61 private CambriaConsumer cambriaConsumer = null;
63 private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
65 private OperationalEnvironmentEntry environmentEntry;
67 public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
69 this.componentUtils = componentUtils;
70 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
71 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
72 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
73 this.consumerGroup = statusConfig.getConsumerGroup();
74 this.consumerId = statusConfig.getConsumerId();
75 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
76 this.environmentEntry = environmentEntry;
77 this.distributionCompleteReporter = distributionCompleteReporter;
80 public void startTask(String topicName) {
82 this.topicName = topicName;
83 logger.debug("start task for polling topic {}", topicName);
84 if (fetchTimeoutInSec < 15) {
85 logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
86 fetchTimeoutInSec = 15;
89 cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
90 fetchTimeoutInSec * 1000);
92 if (scheduledPollingService != null) {
93 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
94 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
97 } catch (Exception e) {
98 logger.debug("unexpected error occured", e);
99 String methodName = new Object() {
100 }.getClass().getEnclosingMethod().getName();
102 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
106 public void stopTask() {
107 if (scheduledFuture != null) {
108 boolean result = scheduledFuture.cancel(true);
109 logger.debug("Stop polling task. result = {}", result);
111 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
113 scheduledFuture = null;
115 if (cambriaConsumer != null) {
116 logger.debug("close consumer");
117 cambriaHandler.closeConsumer(cambriaConsumer);
122 public void destroy() {
129 logger.trace("run() method. polling queue {}", topicName);
133 if (cambriaConsumer == null) {
134 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
139 Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
141 if (fetchResult.isRight()) {
142 CambriaErrorResponse errorResponse = fetchResult.right().value();
143 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
145 // TODO: if status== internal error (connection problem) change
147 // in next try, if succeed - change to active
152 Iterable<String> messages = fetchResult.left().value();
153 for (String message : messages) {
154 logger.trace("received message {}", message);
156 DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
157 handleDistributionNotificationMsg(notification);
158 distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
159 } catch (Exception e) {
160 logger.debug("failed to convert message to object", e);
161 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
165 } catch (Exception e) {
166 logger.debug("unexpected error occured", e);
167 String methodName = new Object() {
168 }.getClass().getEnclosingMethod().getName();
170 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
175 private void handleDistributionNotificationMsg(DistributionStatusNotification notification) {
176 componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
177 String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
178 if (notification.isDistributionCompleteNotification()) {
179 distributionCompleteReporter.reportDistributionComplete(notification);
183 private void shutdownExecutor() {
184 if (scheduledPollingService == null)
187 scheduledPollingService.shutdown(); // Disable new tasks from being
190 // Wait a while for existing tasks to terminate
191 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
192 scheduledPollingService.shutdownNow(); // Cancel currently
194 // Wait a while for tasks to respond to being cancelled
195 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
196 logger.debug("Pool did not terminate");
198 } catch (InterruptedException ie) {
199 // (Re-)Cancel if current thread also interrupted
200 scheduledPollingService.shutdownNow();
201 // Preserve interrupt status
202 Thread.currentThread().interrupt();