fc7c473d6b141b47b2dca81f513e9180c6b636db
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
29 import org.openecomp.sdc.be.config.BeEcompErrorManager;
30 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
34 import org.openecomp.sdc.common.config.EcompErrorName;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.att.nsa.cambria.client.CambriaConsumer;
39 import com.google.gson.Gson;
40 import com.google.gson.GsonBuilder;
41
42 import fj.data.Either;
43
44 public class DistributionEnginePollingTask implements Runnable {
45
46         public static final String DISTRIBUTION_STATUS_POLLING = "distributionEngineStatusPolling";
47
48         private String topicName;
49         private ComponentsUtils componentUtils;
50         private int fetchTimeoutInSec = 15;
51         private int pollingIntervalInSec;
52         private String consumerId;
53         private String consumerGroup;
54         private DistributionEngineConfiguration distributionEngineConfiguration;
55
56         private CambriaHandler cambriaHandler = new CambriaHandler();
57         private Gson gson = new GsonBuilder().setPrettyPrinting().create();
58
59         private ScheduledExecutorService scheduledPollingService = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("TopicPollingThread-%d").build());
60
61         private static Logger logger = LoggerFactory.getLogger(DistributionEnginePollingTask.class.getName());
62
63         ScheduledFuture<?> scheduledFuture = null;
64         private CambriaConsumer cambriaConsumer = null;
65
66         private DistributionEngineClusterHealth distributionEngineClusterHealth = null;
67
68         public DistributionEnginePollingTask(DistributionEngineConfiguration distributionEngineConfiguration, String envName, ComponentsUtils componentUtils, DistributionEngineClusterHealth distributionEngineClusterHealth) {
69
70                 this.componentUtils = componentUtils;
71                 this.distributionEngineConfiguration = distributionEngineConfiguration;
72                 DistributionStatusTopicConfig statusConfig = distributionEngineConfiguration.getDistributionStatusTopic();
73                 this.pollingIntervalInSec = statusConfig.getPollingIntervalSec();
74                 this.fetchTimeoutInSec = statusConfig.getFetchTimeSec();
75                 this.consumerGroup = statusConfig.getConsumerGroup();
76                 this.consumerId = statusConfig.getConsumerId();
77                 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
78         }
79
80         public void startTask(String topicName) {
81
82                 this.topicName = topicName;
83                 logger.debug("start task for polling topic {}", topicName);
84                 if (fetchTimeoutInSec < 15) {
85                         logger.warn("fetchTimeout value should be greater or equal to 15 sec. use default");
86                         fetchTimeoutInSec = 15;
87                 }
88                 try {
89                         cambriaConsumer = cambriaHandler.createConsumer(distributionEngineConfiguration.getUebServers(), topicName, distributionEngineConfiguration.getUebPublicKey(), distributionEngineConfiguration.getUebSecretKey(), consumerId, consumerGroup,
90                                         fetchTimeoutInSec * 1000);
91
92                         if (scheduledPollingService != null) {
93                                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
94                                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
95
96                         }
97                 } catch (Exception e) {
98                         logger.debug("unexpected error occured", e);
99                         String methodName = new Object() {
100                         }.getClass().getEnclosingMethod().getName();
101
102                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
103                         BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
104                 }
105         }
106
107         public void stopTask() {
108                 if (scheduledFuture != null) {
109                         boolean result = scheduledFuture.cancel(true);
110                         logger.debug("Stop polling task. result = {}", result);
111                         if (false == result) {
112                                 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
113                                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "try to stop the polling task");
114                         }
115                         scheduledFuture = null;
116                 }
117                 if (cambriaConsumer != null) {
118                         logger.debug("close consumer");
119                         cambriaHandler.closeConsumer(cambriaConsumer);
120                 }
121
122         }
123
124         public void destroy() {
125                 this.stopTask();
126                 shutdownExecutor();
127         }
128
129         @Override
130         public void run() {
131                 logger.trace("run() method. polling queue {}", topicName);
132
133                 try {
134                         // init error
135                         if (cambriaConsumer == null) {
136                                 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
137                                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
138                                 stopTask();
139                                 return;
140                         }
141
142                         Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
143                         // fetch error
144                         if (fetchResult.isRight()) {
145                                 CambriaErrorResponse errorResponse = fetchResult.right().value();
146                                 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
147                                 BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to fetch messages from topic " + topicName + " error: " + fetchResult.right().value());
148
149                                 // TODO: if status== internal error (connection problem) change
150                                 // state to inactive
151                                 // in next try, if succeed - change to active
152                                 return;
153                         }
154
155                         // success
156                         Iterable<String> messages = fetchResult.left().value();
157                         for (String message : messages) {
158                                 logger.trace("received message {}", message);
159                                 try {
160                                         DistributionStatusNotification notification = gson.fromJson(message, DistributionStatusNotification.class);
161                                         componentUtils.auditDistributionStatusNotification(AuditingActionEnum.DISTRIBUTION_STATUS, notification.getDistributionID(), notification.getConsumerID(), topicName, notification.getArtifactURL(),
162                                                         String.valueOf(notification.getTimestamp()), notification.getStatus().name(), notification.getErrorReason());
163
164                                         distributionEngineClusterHealth.setHealthCheckOkAndReportInCaseLastStateIsDown();
165
166                                 } catch (Exception e) {
167                                         logger.debug("failed to convert message to object", e);
168                                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
169                                         BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "failed to parse message " + message + " from topic " + topicName + " error: " + fetchResult.right().value());
170                                 }
171
172                         }
173                 } catch (Exception e) {
174                         logger.debug("unexpected error occured", e);
175                         String methodName = new Object() {
176                         }.getClass().getEnclosingMethod().getName();
177
178                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeDistributionEngineSystemError, methodName, e.getMessage());
179                         BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(methodName, e.getMessage());
180                 }
181
182         }
183
184         private void shutdownExecutor() {
185                 if (scheduledPollingService == null)
186                         return;
187
188                 scheduledPollingService.shutdown(); // Disable new tasks from being
189                                                                                         // submitted
190                 try {
191                         // Wait a while for existing tasks to terminate
192                         if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS)) {
193                                 scheduledPollingService.shutdownNow(); // Cancel currently
194                                                                                                                 // executing tasks
195                                 // Wait a while for tasks to respond to being cancelled
196                                 if (!scheduledPollingService.awaitTermination(60, TimeUnit.SECONDS))
197                                         logger.debug("Pool did not terminate");
198                         }
199                 } catch (InterruptedException ie) {
200                         // (Re-)Cancel if current thread also interrupted
201                         scheduledPollingService.shutdownNow();
202                         // Preserve interrupt status
203                         Thread.currentThread().interrupt();
204                 }
205         }
206
207 }