d61e15016a38e4f439eac6736c8d239d1f402dfe
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import fj.data.Either;
23 import java.util.HashSet;
24 import java.util.Set;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.stream.Collectors;
31 import org.openecomp.sdc.be.config.BeEcompErrorManager;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
34 import org.openecomp.sdc.be.impl.ComponentsUtils;
35 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
36 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
37 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
38 import org.openecomp.sdc.common.log.wrappers.Logger;
39
40 public class DistributionEngineInitTask implements Runnable {
41
42     public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
43     public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
44     public static final String CONSUMER = "CONSUMER";
45     public static final String PRODUCER = "PRODUCER";
46     public static final String CREATED = "CREATED";
47     public static final String FAILED = "FAILED";
48     public static final Integer HTTP_OK = 200;
49     private static final Logger logger = Logger.getLogger(DistributionEngineInitTask.class.getName());
50     boolean maximumRetryInterval = false;
51     ComponentsUtils componentsUtils = null;
52     DistributionEnginePollingTask distributionEnginePollingTask = null;
53     ScheduledFuture<?> scheduledFuture = null;
54     private Long delayBeforeStartFlow = 0l;
55     private DistributionEngineConfiguration deConfiguration;
56     private String envName;
57     private long retryInterval;
58     private long currentRetryInterval;
59     private long maxInterval;
60     private AtomicBoolean status = null;
61     private OperationalEnvironmentEntry environmentEntry;
62     private CambriaHandler cambriaHandler = new CambriaHandler();
63     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
64
65     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
66                                       AtomicBoolean status, ComponentsUtils componentsUtils,
67                                       DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
68         super();
69         this.delayBeforeStartFlow = delayBeforeStartFlow;
70         this.deConfiguration = deConfiguration;
71         this.envName = envName;
72         this.retryInterval = deConfiguration.getInitRetryIntervalSec();
73         this.currentRetryInterval = retryInterval;
74         this.maxInterval = deConfiguration.getInitMaxIntervalSec();
75         this.status = status;
76         this.componentsUtils = componentsUtils;
77         this.distributionEnginePollingTask = distributionEnginePollingTask;
78         this.environmentEntry = environmentEntry;
79     }
80
81     public static String buildTopicName(String topicName, String environment) {
82         return topicName + "-" + environment.toUpperCase();
83     }
84
85     public void startTask() {
86         if (scheduledExecutorService != null) {
87             Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
88             logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval,
89                 delayBeforeStartFlow);
90             this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
91         }
92     }
93
94     public void restartTask() {
95         this.stopTask();
96         logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
97         long lastCurrentInterval = currentRetryInterval;
98         incrementRetryInterval();
99         this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
100     }
101
102     protected void incrementRetryInterval() {
103         if (currentRetryInterval < maxInterval) {
104             currentRetryInterval *= 2;
105             if (currentRetryInterval > maxInterval) {
106                 setMaxRetryInterval();
107             }
108         } else {
109             setMaxRetryInterval();
110         }
111     }
112
113     private void setMaxRetryInterval() {
114         currentRetryInterval = maxInterval;
115         maximumRetryInterval = true;
116         logger.debug("Set next retry init interval to {}", maxInterval);
117     }
118
119     public void stopTask() {
120         if (scheduledFuture != null) {
121             boolean result = scheduledFuture.cancel(true);
122             logger.debug("Stop reinit task. result = {}", result);
123             if (!result) {
124                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
125             }
126             scheduledFuture = null;
127         }
128     }
129
130     public void destroy() {
131         this.stopTask();
132         if (scheduledExecutorService != null) {
133             scheduledExecutorService.shutdown();
134         }
135     }
136
137     @Override
138     public void run() {
139         boolean result = false;
140         result = initFlow();
141         if (result) {
142             this.stopTask();
143             this.status.set(true);
144             if (this.distributionEnginePollingTask != null) {
145                 String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
146                 logger.debug("start polling distribution status topic {}", topicName);
147                 this.distributionEnginePollingTask.startTask(topicName);
148             }
149         } else {
150             if (!maximumRetryInterval) {
151                 this.restartTask();
152             }
153         }
154     }
155
156     /**
157      * run initialization flow
158      *
159      * @return
160      */
161     public boolean initFlow() {
162         logger.trace("Start init flow for environment {}", this.envName);
163         Set<String> topicsList = null;
164         Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
165         getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
166         if (getTopicsRes.isRight()) {
167             CambriaErrorResponse status = getTopicsRes.right().value();
168             if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
169                 topicsList = new HashSet<>();
170             } else {
171                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
172                 return false;
173             }
174         } else {
175             topicsList = getTopicsRes.left().value();
176         }
177         String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
178         logger.debug("Going to handle topic {}", notificationTopic);
179         if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
180             return false;
181         }
182         CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
183         CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
184         if (createStatus != CambriaOperationStatus.OK) {
185             return false;
186         }
187         String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
188         logger.debug("Going to handle topic {}", statusTopic);
189         if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
190             return false;
191         }
192         CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
193         return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
194     }
195
196     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
197         CambriaErrorResponse registerStatus = cambriaHandler
198             .registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
199                 environmentEntry.getUebApikey(), subscriberType, topicName);
200
201         if (CambriaOperationStatus.AUTHENTICATION_ERROR.equals(registerStatus.getOperationStatus())
202                 || CambriaOperationStatus.CONNNECTION_ERROR.equals(registerStatus.getOperationStatus())){
203             registerStatus = cambriaHandler
204                     .registerToTopic(environmentEntry.getDmaapUebAddress(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(),
205                             environmentEntry.getUebApikey(), subscriberType, topicName);
206         }
207
208         String role = CONSUMER;
209         if (subscriberType == SubscriberTypeEnum.PRODUCER) {
210             role = PRODUCER;
211         }
212         auditRegistration(topicName, registerStatus, role);
213         return registerStatus;
214     }
215
216     private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
217         if (componentsUtils != null) {
218             Integer httpCode = registerProducerStatus.getHttpCode();
219             String httpCodeStr = String.valueOf(httpCode);
220             this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName,
221                 DistributionTopicData.newBuilder().notificationTopic(notificationTopic).build(), role, environmentEntry.getUebApikey(), httpCodeStr);
222         }
223     }
224
225     private boolean createStatusTopicIfNotExists(Set<String> topicsList, String topicName) {
226         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().statusTopic(topicName).build();
227         return createDistributionTopic(topicsList, topicName, distributionTopicData);
228     }
229
230     private boolean createNotificationTopicIfNotExists(Set<String> topicsList, String topicName) {
231         DistributionTopicData distributionTopicData = DistributionTopicData.newBuilder().notificationTopic(topicName).build();
232         return createDistributionTopic(topicsList, topicName, distributionTopicData);
233     }
234
235     private boolean createDistributionTopic(Set<String> topicsList, String topicName, DistributionTopicData distributionTopicData) {
236         boolean isSucceeded = true;
237         if (topicsList.contains(topicName)) {
238             if (componentsUtils != null) {
239                 componentsUtils
240                     .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
241             }
242             return isSucceeded;
243         }
244         CambriaErrorResponse createDistribTopicStatus = cambriaHandler
245             .createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName,
246                 deConfiguration.getCreateTopic().getPartitionCount(), deConfiguration.getCreateTopic().getReplicationCount());
247         CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
248         switch (status) {
249             case OK:
250                 if (componentsUtils != null) {
251                     componentsUtils
252                         .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, CREATED);
253                 }
254                 break;
255             case TOPIC_ALREADY_EXIST:
256                 if (componentsUtils != null) {
257                     componentsUtils
258                         .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, ALREADY_EXISTS);
259                 }
260                 break;
261             default:
262                 if (componentsUtils != null) {
263                     componentsUtils
264                         .auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, distributionTopicData, FAILED);
265                 }
266                 BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
267                 isSucceeded = false;
268                 break;
269         }
270         return isSucceeded;
271     }
272
273     public boolean isActive() {
274         return this.status.get();
275     }
276
277     public long getCurrentRetryInterval() {
278         return currentRetryInterval;
279     }
280
281     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
282         this.cambriaHandler = cambriaHandler;
283     }
284 }