2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29 import org.openecomp.sdc.be.config.BeEcompErrorManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
34 import org.openecomp.sdc.common.config.EcompErrorName;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import com.att.nsa.cambria.client.CambriaConsumer;
39 import com.google.gson.Gson;
40 import com.google.gson.GsonBuilder;
42 import fj.data.Either;
44 public class DistributionEnginePollingTask implements Runnable {
46 public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
48 private String topicName;
49 private ComponentsUtils componentUtils;
50 private int fetchTimeoutInSec = 15;
51 private int pollingIntervalInSec;
52 private String consumerId;
53 private String consumerGroup;
54 private DistributionEngineConfiguration distributionEngineConfiguration;
56 private CambriaHandler cambriaHandler = new CambriaHandler();
57 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
59 private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
61 private static Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class.getName());
63 ScheduledFuture<?> scheduledFuture = null;
64 private CambriaConsumer cambriaConsumer = null;
66 private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
68 public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, String envName, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth) {
70 this.componentUtils = componentUtils;
71 this.distributionEngineConfiguration = distributionEngineConfiguration;
72 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
73 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
74 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
75 this.consumerGroup = statusConfig.getConsumerGroup();
76 this.consumerId = statusConfig.getConsumerId();
77 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
80 public void startTask(String topicName) {
82 this.topicName = topicName;
83 logger.debug("start task for polling topic {}", topicName);
84 if (fetchTimeoutInSec < 15) {
85 logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
86 fetchTimeoutInSec = 15;
89 cambriaConsumer = cambriaHandler.createConsumer(distributionEngineConfiguration.getUebServers(), topicName, distributionEngineConfiguration.getUebPublicKey(), distributionEngineConfiguration.getUebSecretKey(), consumerId, consumerGroup,
90 fetchTimeoutInSec * 1000);
92 if (scheduledPollingService != null) {
93 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
94 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
97 } catch (Exception e) {
98 logger.debug("unexpected error occured", e);
99 String methodName = new Object() {
100 }.getClass().getEnclosingMethod().getName();
102 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
103 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
107 public void stopTask() {
108 if (scheduledFuture != null) {
109 boolean result = scheduledFuture.cancel(true);
110 logger.debug("Stop polling task. result = {}", result);
111 if (false == result) {
112 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
113 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
115 scheduledFuture = null;
117 if (cambriaConsumer != null) {
118 logger.debug("close consumer");
119 cambriaHandler.closeConsumer(cambriaConsumer);
124 public void destroy() {
131 logger.trace("run() method. polling queue {}", topicName);
135 if (cambriaConsumer == null) {
136 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
137 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
142 Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
144 if (fetchResult.isRight()) {
145 CambriaErrorResponse errorResponse = fetchResult.right().value();
146 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
147 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
149 // TODO: if status== internal error (connection problem) change
151 // in next try, if succeed - change to active
156 Iterable<String> messages = fetchResult.left().value();
157 for (String message : messages) {
158 logger.trace("received message {}", message);
160 DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
161 componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
162 String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
164 distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
166 } catch (Exception e) {
167 logger.debug("failed to convert message to object", e);
168 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
169 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
173 } catch (Exception e) {
174 logger.debug("unexpected error occured", e);
175 String methodName = new Object() {
176 }.getClass().getEnclosingMethod().getName();
178 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
179 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
184 private void shutdownExecutor() {
185 if (scheduledPollingService == null)
188 scheduledPollingService.shutdown(); // Disable new tasks from being
191 // Wait a while for existing tasks to terminate
192 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
193 scheduledPollingService.shutdownNow(); // Cancel currently
195 // Wait a while for tasks to respond to being cancelled
196 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
197 logger.debug("Pool did not terminate");
199 } catch (InterruptedException ie) {
200 // (Re-)Cancel if current thread also interrupted
201 scheduledPollingService.shutdownNow();
202 // Preserve interrupt status
203 Thread.currentThread().interrupt();