1eeaa1229ec36f45a99aa8e99549f168eca7fbee
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import org.openecomp.sdc.be.config.BeEcompErrorManager;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
34 import org.openecomp.sdc.be.impl.ComponentsUtils;
35 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
36 import org.openecomp.sdc.common.config.EcompErrorName;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import fj.data.Either;
41
42 public class DistributionEngineInitTask implements Runnable {
43
44         public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
45         public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
46         public static final String CONSUMER = "CONSUMER";
47         public static final String PRODUCER = "PRODUCER";
48         public static final String CREATED = "CREATED";
49         public static final String FAILED = "FAILED";
50         public static final Integer HTTP_OK = 200;
51
52         private Long delayBeforeStartFlow = 0l;
53         private DistributionEngineConfiguration deConfiguration;
54         private String envName;
55         private long retryInterval;
56         private long currentRetryInterval;
57         private long maxInterval;
58         // private boolean active = false;
59         boolean maximumRetryInterval = false;
60         private AtomicBoolean status = null;
61         ComponentsUtils componentsUtils = null;
62         DistributionEnginePollingTask distributionEnginePollingTask = null;
63
64         private CambriaHandler cambriaHandler = new CambriaHandler();
65
66         public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) {
67                 super();
68                 this.delayBeforeStartFlow = delayBeforeStartFlow;
69                 this.deConfiguration = deConfiguration;
70                 this.envName = envName;
71                 this.retryInterval = deConfiguration.getInitRetryIntervalSec();
72                 this.currentRetryInterval = retryInterval;
73                 this.maxInterval = deConfiguration.getInitMaxIntervalSec();
74                 this.status = status;
75                 this.componentsUtils = componentsUtils;
76                 this.distributionEnginePollingTask = distributionEnginePollingTask;
77         }
78
79         private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
80
81         private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName());
82
83         ScheduledFuture<?> scheduledFuture = null;
84
85         public void startTask() {
86                 if (scheduledExecutorService != null) {
87                         Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
88                         logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
89                         this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
90
91                 }
92         }
93
94         public void restartTask() {
95
96                 this.stopTask();
97
98                 logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
99
100                 long lastCurrentInterval = currentRetryInterval;
101                 incrementRetryInterval();
102
103                 this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
104
105         }
106
107         protected void incrementRetryInterval() {
108                 if (currentRetryInterval < maxInterval) {
109                         currentRetryInterval *= 2;
110                         if (currentRetryInterval > maxInterval) {
111                                 setMaxRetryInterval();
112                         }
113                 } else {
114                         setMaxRetryInterval();
115                 }
116         }
117
118         private void setMaxRetryInterval() {
119                 currentRetryInterval = maxInterval;
120                 maximumRetryInterval = true;
121                 logger.debug("Set next retry init interval to {}", maxInterval);
122         }
123
124         public void stopTask() {
125                 if (scheduledFuture != null) {
126                         boolean result = scheduledFuture.cancel(true);
127                         logger.debug("Stop reinit task. result = {}", result);
128                         if (false == result) {
129                                 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
130                                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
131                         }
132                         scheduledFuture = null;
133                 }
134         }
135
136         public void destroy() {
137                 this.stopTask();
138                 if (scheduledExecutorService != null) {
139                         scheduledExecutorService.shutdown();
140                 }
141         }
142
143         @Override
144         public void run() {
145
146                 boolean result = false;
147                 result = initFlow();
148
149                 if (true == result) {
150                         this.stopTask();
151                         this.status.set(true);
152                         if (this.distributionEnginePollingTask != null) {
153                                 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
154                                 logger.debug("start polling distribution status topic {}", topicName);
155                                 this.distributionEnginePollingTask.startTask(topicName);
156                         }
157                 } else {
158                         if (false == maximumRetryInterval) {
159                                 this.restartTask();
160                         }
161                 }
162         }
163
164         /**
165          * run initialization flow
166          * 
167          * @return
168          */
169         public boolean initFlow() {
170
171                 logger.trace("Start init flow for environment {}", this.envName);
172
173                 Set<String> topicsList = null;
174                 Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
175
176                 getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers());
177                 if (getTopicsRes.isRight()) {
178                         CambriaErrorResponse status = getTopicsRes.right().value();
179                         if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
180                                 topicsList = new HashSet<>();
181                         } else {
182                                 BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
183
184                                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
185
186                                 return false;
187                         }
188                 } else {
189                         topicsList = getTopicsRes.left().value();
190                 }
191
192                 String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
193                 logger.debug("Going to handle topic {}", notificationTopic);
194
195                 boolean status = createTopicIfNotExists(topicsList, notificationTopic);
196                 if (false == status) {
197                         return false;
198                 }
199
200                 CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
201
202                 CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
203
204                 if (createStatus != CambriaOperationStatus.OK) {
205                         return false;
206                 }
207
208                 String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
209                 logger.debug("Going to handle topic {}", statusTopic);
210                 status = createTopicIfNotExists(topicsList, statusTopic);
211                 if (false == status) {
212                         return false;
213                 }
214
215                 CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
216
217                 if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
218                         return false;
219                 }
220
221                 return true;
222         }
223
224         private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
225                 CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType);
226
227                 String role = CONSUMER;
228                 if (subscriberType == SubscriberTypeEnum.PRODUCER) {
229                         role = PRODUCER;
230                 }
231                 auditRegistration(topicName, registerStatus, role);
232                 return registerStatus;
233         }
234
235         private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
236                 if (componentsUtils != null) {
237                         Integer httpCode = registerProducerStatus.getHttpCode();
238                         String httpCodeStr = String.valueOf(httpCode);
239                         this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr);
240                 }
241         }
242
243         private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
244
245                 if (topicsList.contains(topicName)) {
246                         if (componentsUtils != null) {
247                                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
248                         }
249                         return true;
250                 }
251
252                 CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
253                                 deConfiguration.getCreateTopic().getReplicationCount());
254
255                 CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
256                 if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
257                         if (componentsUtils != null) {
258                                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
259                         }
260                 } else if (status == CambriaOperationStatus.OK) {
261                         if (componentsUtils != null) {
262                                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
263                         }
264                 } else {
265                         if (componentsUtils != null) {
266                                 this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
267                         }
268                         BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
269
270                         BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
271
272                         return false;
273                 }
274
275                 return true;
276         }
277
278         public static String buildTopicName(String topicName, String environment) {
279                 return topicName + "-" + environment.toUpperCase();
280         }
281
282         public boolean isActive() {
283                 return this.status.get();
284         }
285
286         public long getCurrentRetryInterval() {
287                 return currentRetryInterval;
288         }
289
290         protected void setCambriaHandler(CambriaHandler cambriaHandler) {
291                 this.cambriaHandler = cambriaHandler;
292         }
293 }