[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEnginePollingTask.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import com.att.nsa.cambria.client.CambriaConsumer;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
31 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
32 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
33 import org.openecomp.sdc.be.config.BeEcompErrorManager;
34 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
35 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
36 import org.openecomp.sdc.be.impl.ComponentsUtils;
37 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
38 import org.openecomp.sdc.common.log.wrappers.Logger;
39 import org.openecomp.sdc.common.log.wrappers.LoggerSdcAudit;
40
41 public class DistributionEnginePollingTask implements Runnable {
42
43     public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
44     private static final String PARTNER_NAME = "UNKNOWN";
45     private static final Logger logger = Logger.getLogger(DistributionEnginePollingTask.class.getName());
46     private static LoggerSdcAudit audit = new LoggerSdcAudit(DistributionEnginePollingTask.class);
47     ScheduledFuture<?> scheduledFuture = null;
48     private String topicName;
49     private ComponentsUtils componentUtils;
50     private int fetchTimeoutInSec = 15;
51     private int pollingIntervalInSec;
52     private String consumerId;
53     private String consumerGroup;
54     private CambriaHandler cambriaHandler = new CambriaHandler();
55     private final KafkaHandler kafkaHandler = new KafkaHandler();
56     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
57     private DistributionCompleteReporter distributionCompleteReporter;
58     private ScheduledExecutorService scheduledPollingService = Executors
59         .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
60     private CambriaConsumer cambriaConsumer = null;
61     private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
62     private OperationalEnvironmentEntry environmentEntry;
63
64     public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration,
65                                          DistributionCompleteReporter distributionCompleteReporter, ComponentsUtils componentUtils,
66                                          DistributionEngineClusterHealth distributionEngineClusterHealth,
67                                          OperationalEnvironmentEntry environmentEntry) {
68         this.componentUtils = componentUtils;
69         DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
70         this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
71         this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
72         this.consumerGroup = statusConfig.getConsumerGroup();
73         this.consumerId = statusConfig.getConsumerId();
74         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
75         this.environmentEntry = environmentEntry;
76         this.distributionCompleteReporter = distributionCompleteReporter;
77     }
78
79     public void startTask(String topicName) {
80         this.topicName = topicName;
81         logger.debug("start task for polling topic {}", topicName);
82         if (fetchTimeoutInSec < 15) {
83             logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
84             fetchTimeoutInSec = 15;
85         }
86         try {
87             if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
88                 cambriaConsumer = cambriaHandler
89                     .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
90                         environmentEntry.getUebSecretKey(),
91                         consumerId, consumerGroup, fetchTimeoutInSec * 1000);
92             }
93             if (scheduledPollingService != null) {
94                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
95                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
96             }
97         } catch (Exception e) {
98             logger.debug("unexpected error occured", e);
99             String methodName = Object.class.getEnclosingMethod().getName();
100             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
101         }
102     }
103
104     public void stopTask() {
105         if (scheduledFuture != null) {
106             boolean result = scheduledFuture.cancel(true);
107             logger.debug("Stop polling task. result = {}", result);
108             if (!result) {
109                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
110             }
111             scheduledFuture = null;
112         }
113         if (cambriaConsumer != null) {
114             logger.debug("close consumer");
115             cambriaHandler.closeConsumer(cambriaConsumer);
116         }
117     }
118
119     public void destroy() {
120         this.stopTask();
121         shutdownExecutor();
122     }
123
124     @Override
125     public void run() {
126         logger.trace("run() method. polling queue {}", topicName);
127         Either<Iterable<String>, CambriaErrorResponse> fetchResult;
128         try {
129             // init error
130             if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
131                 if (cambriaConsumer == null) {
132                     BeEcompErrorManager.getInstance()
133                         .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
134                     stopTask();
135                     return;
136                 }
137                 fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
138             } else {
139                 fetchResult = kafkaHandler.fetchFromTopic(topicName);
140             }
141             // fetch error
142             if (fetchResult.isRight()) {
143                 CambriaErrorResponse errorResponse = fetchResult.right().value();
144                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
145                     "failed to fetch messages from topic " + topicName + " error: " + errorResponse);
146                 // TODO: if status== internal error (connection problem) change
147
148                 // state to inactive
149
150                 // in next try, if succeed - change to active
151                 return;
152             }
153             // success
154             Iterable<String> messages = fetchResult.left().value();
155             for (String message : messages) {
156                 logger.trace("received message {}", message);
157                 try {
158                     DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
159                     audit.startAuditFetchLog(PARTNER_NAME, DistributionEnginePollingTask.class.getName());
160                     handleDistributionNotificationMsg(notification, audit);
161                     distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
162                 } catch (Exception e) {
163                     logger.debug("failed to convert message to object", e);
164                     BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING,
165                         "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
166                 }
167             }
168         } catch (Exception e) {
169             logger.debug("unexpected error occurred", e);
170             String methodName = Object.class.getEnclosingMethod().getName();
171             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
172         }
173     }
174
175     private void handleDistributionNotificationMsg(DistributionStatusNotification notification, LoggerSdcAudit audit) {
176         componentUtils.auditDistributionStatusNotification(notification.getDistributionID(), notification.getConsumerID(), topicName,
177             notification.getArtifactURL(), String.valueOf(notification.getTimestamp()), notification.getStatus().name(),
178             notification.getErrorReason(), audit);
179         if (notification.isDistributionCompleteNotification()) {
180             distributionCompleteReporter.reportDistributionComplete(notification);
181         }
182     }
183
184     private void shutdownExecutor() {
185         if (scheduledPollingService == null) {
186             return;
187         }
188         scheduledPollingService.shutdown(); // Disable new tasks from being
189
190         // submitted
191         try {
192             // Wait a while for existing tasks to terminate
193             if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
194                 scheduledPollingService.shutdownNow(); // Cancel currently
195
196                 // executing tasks
197
198                 // Wait a while for tasks to respond to being cancelled
199                 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
200                     logger.debug("Pool did not terminate");
201                 }
202             }
203         } catch (InterruptedException ie) {
204             // (Re-)Cancel if current thread also interrupted
205             scheduledPollingService.shutdownNow();
206             // Preserve interrupt status
207             Thread.currentThread().interrupt();
208         }
209     }
210 }