cebedadf108c74cb361d6b36eeb7c0ee63f3c858
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import lombok.Setter;
23 import org.apache.commons.collections.CollectionUtils;
24 import org.apache.commons.lang3.StringUtils;
25 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
26 import org.openecomp.sdc.be.components.validation.ServiceDistributionValidation;
27 import org.openecomp.sdc.be.config.BeEcompErrorManager;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
30 import org.openecomp.sdc.be.dao.api.ActionStatus;
31 import org.openecomp.sdc.be.model.Service;
32 import org.openecomp.sdc.be.model.User;
33 import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
38
39 import javax.annotation.PostConstruct;
40 import javax.annotation.PreDestroy;
41 import javax.annotation.Resource;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Optional;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.regex.Matcher;
48 import java.util.regex.Pattern;
49
50 @Component("distributionEngine")
51 public class DistributionEngine implements IDistributionEngine {
52
53     private static final Logger logger = Logger.getLogger(DistributionEngine.class.getName());
54     private static final Pattern FQDN_PATTERN = Pattern.compile(
55             "^([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9])(\\.([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9]))*(:[0-9]{2,4})*$",
56             Pattern.CASE_INSENSITIVE);
57     @Autowired
58     private EnvironmentsEngine environmentsEngine;
59     @Autowired
60     @Setter
61     private KafkaHandler kafkaHandler;
62     @Resource
63     private DistributionNotificationSender distributionNotificationSender;
64     @Resource
65     private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
66     @Resource
67     private DistributionEngineClusterHealth distributionEngineClusterHealth;
68     @Resource
69     private ServiceDistributionValidation serviceDistributionValidation;
70     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
71     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
72     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
73
74     @Override
75     public boolean isActive() {
76         if (envNamePerInitTask.isEmpty()) {
77             return false;
78         }
79         for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
80             boolean active = task.isActive();
81             if (!active) {
82                 return false;
83             }
84         }
85         return true;
86     }
87
88     @PostConstruct
89     private void init() {
90         logger.trace("Enter init method of DistributionEngine");
91         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
92                 .getDistributionEngineConfiguration();
93         boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine();
94         logger.debug("Distribution engine activation parameter is {}", startDistributionEngine);
95         if (!startDistributionEngine) {
96             logger.info("The distribution engine is disabled");
97             this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled();
98             return;
99         }
100         boolean isValidConfig = validateConfiguration(distributionEngineConfiguration);
101         if (!isValidConfig) {
102             BeEcompErrorManager.getInstance()
103                     .logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase");
104             this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError();
105             return;
106         }
107         List<String> environments = distributionEngineConfiguration.getEnvironments();
108         for (String envName : environments) {
109             logger.debug("init task for environment {}", envName);
110             AtomicBoolean status = new AtomicBoolean(false);
111             envNamePerStatus.put(envName, status);
112             environmentsEngine.connectUebTopicForDistributionConfTopic(envName, status, envNamePerInitTask, envNamePerPollingTask);
113         }
114         logger.debug("init UEB health check");
115         distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus);
116         logger.trace("Exit init method of DistributionEngine");
117     }
118
119     @PreDestroy
120     public void shutdown() {
121         logger.info("distribution engine shutdown - start");
122         if (envNamePerInitTask != null) {
123             for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
124                 task.destroy();
125             }
126         }
127         if (envNamePerPollingTask != null) {
128             for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) {
129                 task.destroy();
130             }
131         }
132     }
133
134     /**
135      * validate mandatory configuration parameters received
136      *
137      * @param deConfiguration: distribution engine configuration
138      * @return boolean result: true of false
139      */
140     protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) {
141         String methodName = "validateConfiguration";
142         boolean result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments");
143
144         if (!kafkaHandler.isKafkaActive()) {
145             result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result;
146             result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result;
147             result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result;
148             result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result;
149         }
150         result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
151         result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
152         result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
153         result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
154         result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
155         result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
156         result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result;
157         result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result;
158         result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result;
159         return result;
160     }
161
162     private boolean isValidServers(List<String> uebServers, String methodName, String paramName) {
163         if (uebServers == null || uebServers.isEmpty()) {
164             BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
165             return false;
166         }
167         if (uebServers.size() < 2) {
168             BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2);
169             return false;
170         }
171         for (String serverFqdn : uebServers) {
172             if (!isValidFqdn(serverFqdn)) {
173                 BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn);
174                 return false;
175             }
176         }
177         return true;
178     }
179
180     private boolean isValidFqdn(String serverFqdn) {
181         try {
182             Matcher matcher = FQDN_PATTERN.matcher(serverFqdn);
183             return matcher.matches();
184         } catch (Exception e) {
185             logger.debug("Failed to match value of address {}", serverFqdn, e);
186             return false;
187         }
188     }
189
190     private boolean isValidParam(String paramValue, String methodName, String paramName) {
191         if (StringUtils.isEmpty(paramValue)) {
192             BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
193             return false;
194         }
195         return true;
196     }
197
198     private boolean isValidParam(List<String> paramValue, String methodName, String paramName) {
199         if (CollectionUtils.isEmpty(paramValue)) {
200             BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
201             return false;
202         }
203         return true;
204     }
205
206     private boolean isValidObject(Object paramValue, String methodName, String paramName) {
207         if (paramValue == null) {
208             BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
209             return false;
210         }
211         return true;
212     }
213
214     private String getEnvironmentErrorDescription(StorageOperationStatus status) {
215         switch (status) {
216             case DISTR_ENVIRONMENT_NOT_AVAILABLE:
217                 return "environment is unavailable";
218             case DISTR_ENVIRONMENT_NOT_FOUND:
219                 return "environment is not configured in our system";
220             case DISTR_ENVIRONMENT_SENT_IS_INVALID:
221                 return "environment name is invalid";
222             default:
223                 return "unkhown";
224         }
225     }
226
227     @Override
228     public StorageOperationStatus isEnvironmentAvailable(String envName) {
229         if (envName == null || envName.isEmpty()) {
230             return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID;
231         }
232         AtomicBoolean status = envNamePerStatus.get(envName);
233         if (status == null) {
234             return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND;
235         }
236         if (!status.get()) {
237             return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE;
238         }
239         return StorageOperationStatus.OK;
240     }
241
242     @Override
243     public StorageOperationStatus isEnvironmentAvailable() {
244         String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0);
245         return isEnvironmentAvailable(envName);
246     }
247
248     @Override
249     public void disableEnvironment(String envName) {
250         AtomicBoolean status = envNamePerStatus.get(envName);
251         status.set(false);
252     }
253
254     @Override
255     public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, User modifier) {
256         return notifyService(distributionId, service, notificationData, envName, envName, modifier);
257     }
258
259     @Override
260     public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName,
261                                       User modifier) {
262         logger.debug(
263                 "Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}",
264                 distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(), modifier);
265         String topicName = buildTopicName(envName);
266         ActionStatus notifyServiceStatus = Optional.ofNullable(environmentsEngine.getEnvironmentById(envId)).map(EnvironmentMessageBusData::new).map(
267                         messageBusData -> distributionNotificationSender
268                                 .sendNotification(topicName, distributionId, messageBusData, notificationData, service, modifier))
269                 .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE);
270         logger.debug("Finish notifyService. status is {}", notifyServiceStatus);
271         return notifyServiceStatus;
272     }
273
274     private String buildTopicName(String envName) {
275         DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
276         String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName();
277         return DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName);
278     }
279
280     @Override
281     public StorageOperationStatus isReadyForDistribution(String envName) {
282         StorageOperationStatus status = isEnvironmentAvailable(envName);
283         if (status != StorageOperationStatus.OK) {
284             String envErrorDec = getEnvironmentErrorDescription(status);
285             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING,
286                     "Environment name " + envName + " is not available. Reason : " + envErrorDec);
287         }
288         return status;
289     }
290
291     @Override
292     public OperationalEnvironmentEntry getEnvironmentById(String opEnvId) {
293         return environmentsEngine.getEnvironmentById(opEnvId);
294     }
295
296     @Override
297     public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
298         return environmentsEngine.getEnvironmentByDmaapUebAddress(dmaapUebAddress);
299     }
300
301     @Override
302     public INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext) {
303         INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId, workloadContext);
304         value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);
305         return value;
306     }
307 }