Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEnginePollingTask.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.nsa.cambria.client.CambriaConsumer;
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import fj.data.Either;
27 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
28 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
29 import org.openecomp.sdc.be.config.BeEcompErrorManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
34 import org.openecomp.sdc.common.log.wrappers.Logger;
35 import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit;
36
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.ScheduledExecutorService;
39 import java.util.concurrent.ScheduledFuture;
40 import java.util.concurrent.TimeUnit;
41
42 public class DistributionEnginePollingTask implements Runnable {
43
44     public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
45     private static final String PARTNER_NAME = "UNKNOWN";
46
47     private String topicName;
48     private ComponentsUtils componentUtils;
49     private int fetchTimeoutInSec = 15;
50     private int pollingIntervalInSec;
51     private String consumerId;
52     private String consumerGroup;
53
54     private CambriaHandler cambriaHandler = new CambriaHandler();
55     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
56     private DistributionCompleteReporter distributionCompleteReporter;
57
58     private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
59
60     private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
61     private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
62
63     ScheduledFuture<?> scheduledFuture = null;
64     private CambriaConsumer cambriaConsumer = null;
65
66     private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
67
68     private OperationalEnvironmentEntry environmentEntry;
69
70     public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth, OperationalEnvironmentEntry environmentEntry) {
71
72         this.componentUtils = componentUtils;
73         DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
74         this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
75         this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
76         this.consumerGroup = statusConfig.getConsumerGroup();
77         this.consumerId = statusConfig.getConsumerId();
78         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
79         this.environmentEntry = environmentEntry;
80         this.distributionCompleteReporter = distributionCompleteReporter;
81     }
82
83     public void startTask(String topicName) {
84
85         this.topicName = topicName;
86         logger.debug("start task for polling topic {}", topicName);
87         if (fetchTimeoutInSec < 15) {
88             logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
89             fetchTimeoutInSec = 15;
90         }
91         try {
92             cambriaConsumer = cambriaHandler.createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), consumerId, consumerGroup,
93                     fetchTimeoutInSec * 1000);
94
95             if (scheduledPollingService != null) {
96                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
97                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
98
99             }
100         } catch (Exception e) {
101             logger.debug("unexpected error occured", e);
102             String methodName = Object.class.getEnclosingMethod().getName();
103             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
104         }
105     }
106
107     public void stopTask() {
108         if (scheduledFuture != null) {
109             boolean result = scheduledFuture.cancel(true);
110             logger.debug("Stop polling task. result = {}", result);
111             if (!result) {
112                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
113             }
114             scheduledFuture = null;
115         }
116         if (cambriaConsumer != null) {
117             logger.debug("close consumer");
118             cambriaHandler.closeConsumer(cambriaConsumer);
119         }
120
121     }
122
123     public void destroy() {
124         this.stopTask();
125         shutdownExecutor();
126     }
127
128     @Override
129     public void run() {
130         logger.trace("run() method. polling queue {}", topicName);
131
132         try {
133             // init error
134             if (cambriaConsumer == null) {
135                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
136                 stopTask();
137                 return;
138             }
139
140             Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
141             // fetch error
142             if (fetchResult.isRight()) {
143                 CambriaErrorResponse errorResponse = fetchResult.right().value();
144                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
145
146                 // TODO: if status== internal error (connection problem) change
147                 // state to inactive
148                 // in next try, if succeed - change to active
149                 return;
150             }
151
152             // success
153             Iterable<String> messages = fetchResult.left().value();
154             for (String message : messages) {
155                 logger.trace("received message {}", message);
156                 try {
157                     DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
158                     audit.startAuditFetchLog(PARTNER_NAME, DistributionEnginePollingTask.class.getName());
159                     handleDistributionNotificationMsg(notification, audit);
160                     distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
161                 } catch (Exception e) {
162                     logger.debug("failed to convert message to object", e);
163                     BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
164                 }
165
166             }
167         } catch (Exception e) {
168             logger.debug("unexpected error occurred", e);
169             String methodName = Object.class.getEnclosingMethod().getName();
170             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
171         }
172
173     }
174
175     private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) {
176         componentUtils.auditDistributionStatusNotification(notification.getDistributionID(),
177                 notification.getConsumerID(), topicName, notification.getArtifactURL(),
178                 String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
179                 notification.getErrorReason(), audit);
180         if (notification.isDistributionCompleteNotification()) {
181             distributionCompleteReporter.reportDistributionComplete(notification);
182         }
183     }
184
185     private void shutdownExecutor() {
186         if (scheduledPollingService == null)
187             return;
188
189         scheduledPollingService.shutdown(); // Disable new tasks from being
190         // submitted
191         try {
192             // Wait a while for existing tasks to terminate
193             if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
194                 scheduledPollingService.shutdownNow(); // Cancel currently
195                 // executing tasks
196                 // Wait a while for tasks to respond to being cancelled
197                 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
198                     logger.debug("Pool did not terminate");
199             }
200         } catch (InterruptedException ie) {
201             // (Re-)Cancel if current thread also interrupted
202             scheduledPollingService.shutdownNow();
203             // Preserve interrupt status
204             Thread.currentThread().interrupt();
205         }
206     }
207
208 }