1 package org.openecomp.sdc.be.components.distribution.engine;
3 import static org.apache.commons.lang3.StringUtils.isEmpty;
4 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.LinkedList;
10 import java.util.List;
13 import java.util.concurrent.atomic.AtomicBoolean;
14 import java.util.function.Function;
15 import java.util.function.Supplier;
16 import java.util.stream.Collectors;
18 import javax.annotation.PostConstruct;
20 import org.apache.commons.lang3.StringUtils;
21 import org.apache.http.HttpStatus;
22 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
23 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
24 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
25 import org.openecomp.sdc.be.config.ConfigurationManager;
26 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
27 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
28 import org.openecomp.sdc.be.config.DmeConfiguration;
29 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
30 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
31 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
32 import org.openecomp.sdc.be.impl.ComponentsUtils;
33 import org.openecomp.sdc.be.info.OperationalEnvInfo;
34 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
35 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
36 import org.openecomp.sdc.common.datastructure.Wrapper;
37 import org.openecomp.sdc.common.http.client.api.HttpResponse;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.stereotype.Service;
42 import com.att.aft.dme2.api.DME2Exception;
43 import com.att.aft.dme2.iterator.DME2EndpointIterator;
44 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
45 import com.att.aft.dme2.manager.registry.DME2Endpoint;
46 import com.att.nsa.apiClient.credentials.ApiCredential;
47 import com.google.common.annotations.VisibleForTesting;
48 import com.google.gson.Gson;
49 import com.google.gson.GsonBuilder;
51 import fj.data.Either;
54 * Allows to consume DMAAP topic and handle received notifications
57 public class EnvironmentsEngine implements INotificationHandler {
59 private static final String MESSAGE_BUS = "MessageBus";
60 private static final String UNKNOWN = "Unknown";
61 private static final Logger log = LoggerFactory.getLogger(EnvironmentsEngine.class);
62 private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
64 private Map<String, OperationalEnvironmentEntry> environments;
65 private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
66 private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
67 private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
69 private final DmaapConsumer dmaapConsumer;
70 private final OperationalEnvironmentDao operationalEnvironmentDao;
71 private final DME2EndpointIteratorCreator epIterCreator;
72 private final AaiRequestHandler aaiRequestHandler;
73 private final ComponentsUtils componentUtils;
74 private final CambriaHandler cambriaHandler;
75 private final DistributionEngineClusterHealth distributionEngineClusterHealth;
76 private final DistributionCompleteReporter distributionCompleteReporter;
78 public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
79 this.dmaapConsumer = dmaapConsumer;
80 this.operationalEnvironmentDao = operationalEnvironmentDao;
81 this.epIterCreator = epIterCreator;
82 this.aaiRequestHandler = aaiRequestHandler;
83 this.componentUtils = componentUtils;
84 this.cambriaHandler = cambriaHandler;
85 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
86 this.distributionCompleteReporter = distributionCompleteReporter;
92 log.trace("Environments engine has been initialized. ");
94 environments = populateEnvironments();
95 createUebTopicsForEnvironments();
96 dmaapConsumer.consumeDmaapTopic(this::handleMessage,
97 (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
100 log.error("An error occurred upon consuming topic by Dmaap consumer client." , e);
103 public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
104 AtomicBoolean status,
105 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
106 connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
110 public void connectUebTopicForDistributionConfTopic(String envName,
111 AtomicBoolean status,
112 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
113 connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
117 * Allows to create and run UEB initializing and polling tasks
119 * @param envNamePerInitTask
120 * @param envNamePerPollingTask
123 private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
124 Map<String, DistributionEngineInitTask> envNamePerInitTask,
125 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
127 String envId = opEnvEntry.getEnvironmentId();
129 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
130 .getDistributionEngineConfiguration();
131 DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
132 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
134 String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
135 DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
136 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
138 distributionEngineInitTask.startTask();
139 envNamePerInitTask.put(envId, distributionEngineInitTask);
140 envNamePerPollingTask.put(envId, distributionEnginePollingTask);
142 log.debug("Environment envId = {} has been connected to the UEB topic", envId);
146 public boolean handleMessage(String notification) {
147 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
148 .getConfiguration().getDmaapConsumerConfiguration();
149 Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
150 Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
151 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
154 if (eitherTimeOut.isRight()) {
157 result = eitherTimeOut.left().value();
162 public boolean handleMessageLogic(String notification) {
163 Wrapper<Boolean> errorWrapper = new Wrapper<>();
164 Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
167 log.debug("handle message - for operational environment notification received: {}", notification);
168 Gson gsonObj = new GsonBuilder().create();
170 IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
171 DmaapNotificationDataImpl.class);
172 IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
173 DmaapNotificationDataImpl.class);
175 AuditingActionEnum actionEnum;
176 switch(notificationData.getAction()) {
178 actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
181 actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
184 actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
187 actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
190 componentUtils.auditEnvironmentEngine(actionEnum,
191 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
192 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
193 auditNotificationData.getTenantContext());
195 if (errorWrapper.isEmpty()) {
196 validateNotification(errorWrapper, notificationData, auditNotificationData);
198 // Perform Save In-Progress Dao
199 if (errorWrapper.isEmpty()) {
200 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
203 if (errorWrapper.isEmpty()) {
204 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
207 } catch (Exception e) {
208 log.debug("handle message for operational environmet failed for notification: {} with error :{}",
209 notification, e.getMessage(), e);
210 errorWrapper.setInnerElement(false);
213 return errorWrapper.isEmpty();
216 private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
217 IDmaapAuditNotificationData auditNotificationData) {
218 // Check OperationaEnvironmentType
219 if (errorWrapper.isEmpty()) {
220 validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
223 if (errorWrapper.isEmpty()) {
224 validateActionType(errorWrapper, notificationData);
226 // Check is valid for create/update (not In-Progress state)
227 if (errorWrapper.isEmpty()) {
228 validateState(errorWrapper, notificationData);
232 public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
233 // Get Env Info From A&AI
234 if (errorWrapper.isEmpty()) {
235 retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
238 if (errorWrapper.isEmpty()) {
239 // Get List Of UEB Addresses From AFT_DME
240 retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
243 // Create UEB keys and set them on EnvEntry
244 if (errorWrapper.isEmpty()) {
245 createUebKeys(errorWrapper, opEnvEntry);
249 if (errorWrapper.isEmpty()) {
250 log.debug("handle message - Create Topics");
251 createUebTopicsForEnvironment(opEnvEntry);
254 // Save Status Complete and Add to Map
255 if (errorWrapper.isEmpty()) {
256 saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
259 // Update Environments Map
260 if (errorWrapper.isEmpty()) {
261 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
264 saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
268 private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
269 log.debug("handle message - save OperationalEnvironment Failed Status");
270 opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
271 saveOpEnvEntry(errorWrapper, opEnvEntry);
274 void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
275 log.debug("handle message - save OperationalEnvironment Complete Dao");
276 opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
277 saveOpEnvEntry(errorWrapper, opEnvEntry);
281 void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
282 log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
284 boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
285 if( isKeyFieldsValid ){
286 String opEnvKey = map2OpEnvKey(opEnvEntry);
287 String environmentId = opEnvEntry.getEnvironmentId();
288 List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
289 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
292 errorWrapper.setInnerElement(false);
293 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
296 } catch (DME2Exception e) {
297 errorWrapper.setInnerElement(false);
298 log.error("Failed to retrieve Ueb Addresses From DME. ", e);
302 void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
303 log.debug("handle message - Create UEB keys");
304 List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
305 .collect(Collectors.toList());
306 Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
307 .createUebKeys(discoverEndPoints);
308 if (eitherCreateUebKeys.isRight()) {
309 errorWrapper.setInnerElement(false);
310 log.debug("handle message - failed to create UEB Keys");
312 ApiCredential apiCredential = eitherCreateUebKeys.left().value();
313 opEnvEntry.setUebApikey(apiCredential.getApiKey());
314 opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
318 void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
319 log.debug("handle message - Get Env Info From A&AI");
320 Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
321 opEnvEntry.getEnvironmentId());
322 if (eitherOperationalEnvInfo.isRight()) {
323 errorWrapper.setInnerElement(false);
324 log.debug("handle message - failed to retrieve details from A&AI");
326 OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
327 opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
328 opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
332 void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
333 log.debug("handle message - save OperationalEnvironment In-Progress Dao");
334 OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
335 // Entry Environment ID holds actually the environment NAME
336 opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
337 opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
338 opEnvEntry.setIsProduction(false);
339 saveOpEnvEntry(errorWrapper, opEnvEntry);
340 opEnvEntryWrapper.setInnerElement(opEnvEntry);
345 void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
346 log.debug("handle message - verify OperationalEnvironment not In-Progress");
347 String opEnvId = notificationData.getOperationalEnvironmentId();
349 Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
351 if (eitherOpEnv.isLeft()) {
352 final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
353 if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
354 errorWrapper.setInnerElement(false);
355 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
358 CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
359 if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
360 errorWrapper.setInnerElement(false);
361 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
368 void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
369 log.debug("handle message - verify Action Type");
370 DmaapActionEnum action = notificationData.getAction();
371 if (action == DmaapActionEnum.DELETE) {
372 errorWrapper.setInnerElement(false);
373 log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
377 void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
378 IDmaapAuditNotificationData auditNotificationData) {
379 log.debug("handle message - verify OperationaEnvironmentType");
380 OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
381 if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
382 errorWrapper.setInnerElement(false);
383 log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
384 componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
385 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
386 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
387 auditNotificationData.getTenantContext());
392 private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
393 entry.setLastModified(new Date(System.currentTimeMillis()));
394 CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
395 if (saveStaus != CassandraOperationStatus.OK) {
396 errorWrapper.setInnerElement(false);
397 log.debug("handle message saving operational environmet failed for id :{} with error : {}",
398 entry.getEnvironmentId(), saveStaus);
402 public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception {
403 DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration();
404 List<String> uebHosts = new LinkedList<>();
406 String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey,
408 DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
410 // Beginning iteration
411 while (iterator.hasNext()) {
412 DME2EndpointReference ref = iterator.next();
413 DME2Endpoint dmeEndpoint = ref.getEndpoint();
414 log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
415 uebHosts.add(dmeEndpoint.getHost());
421 private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
422 return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
425 private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
426 Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
427 OperationalEnvironmentEntry confEntry = readEnvFromConfig();
428 envs.put(confEntry.getEnvironmentId(), confEntry);
432 private OperationalEnvironmentEntry readEnvFromConfig() {
433 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
434 DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
435 .getDistributionEngineConfiguration();
436 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
437 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
439 Set<String> puebEndpoints = new HashSet<>();
440 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
441 entry.setDmaapUebAddress(puebEndpoints);
443 String envName = distributionEngineConfiguration.getEnvironments().size() == 1
444 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
445 entry.setEnvironmentId(envName);
447 if(log.isDebugEnabled()) {
448 log.debug("Enviroment read from configuration: {}", entry.toString());
454 private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
455 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
456 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
458 if (opEnvResult.isLeft()) {
459 Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
460 .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
461 resultMap.forEach( (key, value) -> log.debug("Enviroment loaded from DB: {}", value.toString()) );
464 CassandraOperationStatus status = opEnvResult.right().value();
465 log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
466 return new HashMap<>();
470 void createUebTopicsForEnvironments() {
471 environments.values().forEach(this::createUebTopicsForEnvironment);
474 public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
475 String envId = opEnvEntry.getEnvironmentId();
476 log.debug("Create Environment {} on UEB Topic.", envId);
477 AtomicBoolean status = new AtomicBoolean(false);
478 envNamePerStatus.put(envId, status);
480 connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
484 void setConfigurationManager(ConfigurationManager configurationManager) {
485 this.configurationManager = configurationManager;
488 public Map<String, OperationalEnvironmentEntry> getEnvironments() {
493 public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
494 HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
495 if (resp.getStatusCode() == HttpStatus.SC_OK) {
497 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
499 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
500 return Either.left(operationalEnvInfo);
501 } catch (Exception e) {
502 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
503 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
506 log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
507 resp.getStatusCode(), resp.getDescription());
508 return Either.right(resp.getStatusCode());
512 public OperationalEnvironmentEntry getEnvironmentById (String envId) {
513 return environments.get(envId);
516 public boolean isInMap(OperationalEnvironmentEntry env) {
517 return isInMap(env.getEnvironmentId());
520 public boolean isInMap(String envId) {
521 return environments.containsKey(envId);
524 public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
525 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);