2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.distribution.engine;
23 import org.apache.commons.collections.CollectionUtils;
24 import org.apache.commons.lang3.StringUtils;
25 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
26 import org.openecomp.sdc.be.components.validation.ServiceDistributionValidation;
27 import org.openecomp.sdc.be.config.BeEcompErrorManager;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
30 import org.openecomp.sdc.be.dao.api.ActionStatus;
31 import org.openecomp.sdc.be.model.Service;
32 import org.openecomp.sdc.be.model.User;
33 import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Component;
39 import javax.annotation.PostConstruct;
40 import javax.annotation.PreDestroy;
41 import javax.annotation.Resource;
42 import java.util.HashMap;
43 import java.util.List;
45 import java.util.Optional;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.regex.Matcher;
48 import java.util.regex.Pattern;
50 @Component("distributionEngine")
51 public class DistributionEngine implements IDistributionEngine {
53 private static final Logger logger = Logger.getLogger(DistributionEngine.class.getName());
54 private static final Pattern FQDN_PATTERN = Pattern.compile(
55 "^([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9])(\\.([A-Z0-9]|[A-Z0-9][A-Z0-9\\-]{0,61}[A-Z0-9]))*(:[0-9]{2,4})*$",
56 Pattern.CASE_INSENSITIVE);
58 private EnvironmentsEngine environmentsEngine;
61 private KafkaHandler kafkaHandler;
63 private DistributionNotificationSender distributionNotificationSender;
65 private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
67 private DistributionEngineClusterHealth distributionEngineClusterHealth;
69 private ServiceDistributionValidation serviceDistributionValidation;
70 private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
71 private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
72 private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
75 public boolean isActive() {
76 if (envNamePerInitTask.isEmpty()) {
79 for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
80 boolean active = task.isActive();
90 logger.trace("Enter init method of DistributionEngine");
91 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
92 .getDistributionEngineConfiguration();
93 boolean startDistributionEngine = distributionEngineConfiguration.isStartDistributionEngine();
94 logger.debug("Distribution engine activation parameter is {}", startDistributionEngine);
95 if (!startDistributionEngine) {
96 logger.info("The distribution engine is disabled");
97 this.distributionEngineClusterHealth.setHealthCheckUebIsDisabled();
100 boolean isValidConfig = validateConfiguration(distributionEngineConfiguration);
101 if (!isValidConfig) {
102 BeEcompErrorManager.getInstance()
103 .logBeUebSystemError(DistributionEngineInitTask.INIT_DISTRIBUTION_ENGINE_FLOW, "validate distribution configuration in init phase");
104 this.distributionEngineClusterHealth.setHealthCheckUebConfigurationError();
107 List<String> environments = distributionEngineConfiguration.getEnvironments();
108 for (String envName : environments) {
109 logger.debug("init task for environment {}", envName);
110 AtomicBoolean status = new AtomicBoolean(false);
111 envNamePerStatus.put(envName, status);
112 environmentsEngine.connectUebTopicForDistributionConfTopic(envName, status, envNamePerInitTask, envNamePerPollingTask);
114 logger.debug("init UEB health check");
115 distributionEngineClusterHealth.startHealthCheckTask(envNamePerStatus);
116 logger.trace("Exit init method of DistributionEngine");
120 public void shutdown() {
121 logger.info("distribution engine shutdown - start");
122 if (envNamePerInitTask != null) {
123 for (DistributionEngineInitTask task : envNamePerInitTask.values()) {
127 if (envNamePerPollingTask != null) {
128 for (DistributionEnginePollingTask task : envNamePerPollingTask.values()) {
135 * validate mandatory configuration parameters received
137 * @param deConfiguration: distribution engine configuration
138 * @return boolean result: true of false
140 protected boolean validateConfiguration(DistributionEngineConfiguration deConfiguration) {
141 String methodName = "validateConfiguration";
142 boolean result = isValidParam(deConfiguration.getEnvironments(), methodName, "environments");
144 if (!kafkaHandler.isKafkaActive()) {
145 result = isValidServers(deConfiguration.getUebServers(), methodName, "uebServers") && result;
146 result = isValidParam(deConfiguration.getUebPublicKey(), methodName, "uebPublicKey") && result;
147 result = isValidParam(deConfiguration.getUebSecretKey(), methodName, "uebSecretKey") && result;
148 result = isValidObject(deConfiguration.getCreateTopic(), methodName, "createTopic") && result;
150 result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
151 result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
152 result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
153 result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
154 result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
155 result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
156 result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerGroup(), methodName, "consumerGroup") && result;
157 result = isValidObject(deConfiguration.getDistributionStatusTopic().getFetchTimeSec(), methodName, "fetchTimeSec") && result;
158 result = isValidObject(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec(), methodName, "pollingIntervalSec") && result;
162 private boolean isValidServers(List<String> uebServers, String methodName, String paramName) {
163 if (uebServers == null || uebServers.isEmpty()) {
164 BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
167 if (uebServers.size() < 2) {
168 BeEcompErrorManager.getInstance().logBeConfigurationInvalidListSizeError(methodName, paramName, 2);
171 for (String serverFqdn : uebServers) {
172 if (!isValidFqdn(serverFqdn)) {
173 BeEcompErrorManager.getInstance().logBeInvalidConfigurationError(methodName, paramName, serverFqdn);
180 private boolean isValidFqdn(String serverFqdn) {
182 Matcher matcher = FQDN_PATTERN.matcher(serverFqdn);
183 return matcher.matches();
184 } catch (Exception e) {
185 logger.debug("Failed to match value of address {}", serverFqdn, e);
190 private boolean isValidParam(String paramValue, String methodName, String paramName) {
191 if (StringUtils.isEmpty(paramValue)) {
192 BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
198 private boolean isValidParam(List<String> paramValue, String methodName, String paramName) {
199 if (CollectionUtils.isEmpty(paramValue)) {
200 BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
206 private boolean isValidObject(Object paramValue, String methodName, String paramName) {
207 if (paramValue == null) {
208 BeEcompErrorManager.getInstance().logBeMissingConfigurationError(methodName, paramName);
214 private String getEnvironmentErrorDescription(StorageOperationStatus status) {
216 case DISTR_ENVIRONMENT_NOT_AVAILABLE:
217 return "environment is unavailable";
218 case DISTR_ENVIRONMENT_NOT_FOUND:
219 return "environment is not configured in our system";
220 case DISTR_ENVIRONMENT_SENT_IS_INVALID:
221 return "environment name is invalid";
228 public StorageOperationStatus isEnvironmentAvailable(String envName) {
229 if (envName == null || envName.isEmpty()) {
230 return StorageOperationStatus.DISTR_ENVIRONMENT_SENT_IS_INVALID;
232 AtomicBoolean status = envNamePerStatus.get(envName);
233 if (status == null) {
234 return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_FOUND;
237 return StorageOperationStatus.DISTR_ENVIRONMENT_NOT_AVAILABLE;
239 return StorageOperationStatus.OK;
243 public StorageOperationStatus isEnvironmentAvailable() {
244 String envName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration().getEnvironments().get(0);
245 return isEnvironmentAvailable(envName);
249 public void disableEnvironment(String envName) {
250 AtomicBoolean status = envNamePerStatus.get(envName);
255 public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envName, User modifier) {
256 return notifyService(distributionId, service, notificationData, envName, envName, modifier);
260 public ActionStatus notifyService(String distributionId, Service service, INotificationData notificationData, String envId, String envName,
263 "Received notify service request. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}",
264 distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(), modifier);
265 String topicName = buildTopicName(envName);
266 ActionStatus notifyServiceStatus = Optional.ofNullable(environmentsEngine.getEnvironmentById(envId)).map(EnvironmentMessageBusData::new).map(
267 messageBusData -> distributionNotificationSender
268 .sendNotification(topicName, distributionId, messageBusData, notificationData, service, modifier))
269 .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE);
270 logger.debug("Finish notifyService. status is {}", notifyServiceStatus);
271 return notifyServiceStatus;
274 private String buildTopicName(String envName) {
275 DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
276 String distributionNotifTopicName = deConfiguration.getDistributionNotifTopicName();
277 return DistributionEngineInitTask.buildTopicName(distributionNotifTopicName, envName);
281 public StorageOperationStatus isReadyForDistribution(String envName) {
282 StorageOperationStatus status = isEnvironmentAvailable(envName);
283 if (status != StorageOperationStatus.OK) {
284 String envErrorDec = getEnvironmentErrorDescription(status);
285 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(DistributionNotificationSender.DISTRIBUTION_NOTIFICATION_SENDING,
286 "Environment name " + envName + " is not available. Reason : " + envErrorDec);
292 public OperationalEnvironmentEntry getEnvironmentById(String opEnvId) {
293 return environmentsEngine.getEnvironmentById(opEnvId);
297 public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
298 return environmentsEngine.getEnvironmentByDmaapUebAddress(dmaapUebAddress);
302 public INotificationData buildServiceForDistribution(Service service, String distributionId, String workloadContext) {
303 INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, distributionId, workloadContext);
304 value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);