d41d334bff25a0d31641aa73aed66f3840cf9260
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
30 import org.openecomp.sdc.be.config.BeEcompErrorManager;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
33 import org.openecomp.sdc.be.impl.ComponentsUtils;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.att.nsa.cambria.client.CambriaConsumer;
40 import com.google.gson.Gson;
41 import com.google.gson.GsonBuilder;
42
43 import fj.data.Either;
44
45 public class DistributionEnginePollingTask implements Runnable {
46
47     public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
48
49     private String topicName;
50     private ComponentsUtils componentUtils;
51     private int fetchTimeoutInSec = 15;
52     private int pollingIntervalInSec;
53     private String consumerId;
54     private String consumerGroup;
55
56     private CambriaHandler cambriaHandler = new CambriaHandler();
57     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
58     private DistributionCompleteReporter distributionCompleteReporter;
59
60     private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
61
62     private static final Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class);
63
64     ScheduledFuture<?> scheduledFuture = null;
65     private CambriaConsumer cambriaConsumer = null;
66
67     private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
68
69     private OperationalEnvironmentEntry environmentEntry;
70
71     public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
72
73         this.componentUtils = componentUtils;
74         DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
75         this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
76         this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
77         this.consumerGroup = statusConfig.getConsumerGroup();
78         this.consumerId = statusConfig.getConsumerId();
79         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
80         this.environmentEntry = environmentEntry;
81         this.distributionCompleteReporter = distributionCompleteReporter;
82     }
83
84     public void startTask(String topicName) {
85
86         this.topicName = topicName;
87         logger.debug("start task for polling topic {}", topicName);
88         if (fetchTimeoutInSec < 15) {
89             logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
90             fetchTimeoutInSec = 15;
91         }
92         try {
93             cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
94                     fetchTimeoutInSec * 1000);
95
96             if (scheduledPollingService != null) {
97                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
98                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
99
100             }
101         } catch (Exception e) {
102             logger.debug("unexpected error occured", e);
103             String methodName = new Object() {
104             }.getClass().getEnclosingMethod().getName();
105
106             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
107         }
108     }
109
110     public void stopTask() {
111         if (scheduledFuture != null) {
112             boolean result = scheduledFuture.cancel(true);
113             logger.debug("Stop polling task. result = {}", result);
114             if (false == result) {
115                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
116             }
117             scheduledFuture = null;
118         }
119         if (cambriaConsumer != null) {
120             logger.debug("close consumer");
121             cambriaHandler.closeConsumer(cambriaConsumer);
122         }
123
124     }
125
126     public void destroy() {
127         this.stopTask();
128         shutdownExecutor();
129     }
130
131     @Override
132     public void run() {
133         logger.trace("run() method. polling queue {}", topicName);
134
135         try {
136             // init error
137             if (cambriaConsumer == null) {
138                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
139                 stopTask();
140                 return;
141             }
142
143             Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
144             // fetch error
145             if (fetchResult.isRight()) {
146                 CambriaErrorResponse errorResponse = fetchResult.right().value();
147                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
148
149                 // TODO: if status== internal error (connection problem) change
150                 // state to inactive
151                 // in next try, if succeed - change to active
152                 return;
153             }
154
155             // success
156             Iterable<String> messages = fetchResult.left().value();
157             for (String message : messages) {
158                 logger.trace("received message {}", message);
159                 try {
160                     DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
161                     handleDistributionNotificationMsg(notification);
162                     distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
163                 } catch (Exception e) {
164                     logger.debug("failed to convert message to object", e);
165                     BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
166                 }
167
168             }
169         } catch (Exception e) {
170             logger.debug("unexpected error occured", e);
171             String methodName = new Object() {
172             }.getClass().getEnclosingMethod().getName();
173
174             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
175         }
176
177     }
178
179     private void handleDistributionNotificationMsg(DistributionStatusNotification notification) {
180         componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
181                 String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
182         if (notification.isDistributionCompleteNotification()) {
183             distributionCompleteReporter.reportDistributionComplete(notification);
184         }
185     }
186
187     private void shutdownExecutor() {
188         if (scheduledPollingService == null)
189             return;
190
191         scheduledPollingService.shutdown(); // Disable new tasks from being
192         // submitted
193         try {
194             // Wait a while for existing tasks to terminate
195             if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
196                 scheduledPollingService.shutdownNow(); // Cancel currently
197                 // executing tasks
198                 // Wait a while for tasks to respond to being cancelled
199                 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
200                     logger.debug("Pool did not terminate");
201             }
202         } catch (InterruptedException ie) {
203             // (Re-)Cancel if current thread also interrupted
204             scheduledPollingService.shutdownNow();
205             // Preserve interrupt status
206             Thread.currentThread().interrupt();
207         }
208     }
209
210 }