1 package org.openecomp.sdc.be.components.distribution.engine;
3 import com.att.aft.dme2.api.DME2Exception;
4 import com.att.aft.dme2.iterator.DME2EndpointIterator;
5 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
6 import com.att.aft.dme2.manager.registry.DME2Endpoint;
7 import com.att.nsa.apiClient.credentials.ApiCredential;
8 import com.google.common.annotations.VisibleForTesting;
9 import com.google.gson.Gson;
10 import com.google.gson.GsonBuilder;
11 import fj.data.Either;
12 import org.apache.commons.lang3.StringUtils;
13 import org.apache.http.HttpStatus;
14 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
15 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
16 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
17 import org.openecomp.sdc.be.config.ConfigurationManager;
18 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
19 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
20 import org.openecomp.sdc.be.config.DmeConfiguration;
21 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
22 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
23 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
24 import org.openecomp.sdc.be.impl.ComponentsUtils;
25 import org.openecomp.sdc.be.info.OperationalEnvInfo;
26 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
27 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
28 import org.openecomp.sdc.common.datastructure.Wrapper;
29 import org.openecomp.sdc.common.http.client.api.HttpResponse;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.stereotype.Service;
34 import javax.annotation.PostConstruct;
35 import java.lang.Thread.UncaughtExceptionHandler;
36 import java.util.Date;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.LinkedList;
40 import java.util.List;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.function.Function;
45 import java.util.function.Supplier;
46 import java.util.stream.Collectors;
48 import static org.apache.commons.lang3.StringUtils.isEmpty;
49 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
52 * Allows to consume DMAAP topic and handle received notifications
55 public class EnvironmentsEngine implements INotificationHandler {
57 private static final String MESSAGE_BUS = "MessageBus";
58 private static final String UNKNOWN = "Unknown";
59 private static final Logger log = LoggerFactory.getLogger(EnvironmentsEngine.class);
60 private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
62 private Map<String, OperationalEnvironmentEntry> environments;
63 private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
64 private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
65 private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
67 private final DmaapConsumer dmaapConsumer;
68 private final OperationalEnvironmentDao operationalEnvironmentDao;
69 private final DME2EndpointIteratorCreator epIterCreator;
70 private final AaiRequestHandler aaiRequestHandler;
71 private final ComponentsUtils componentUtils;
72 private final CambriaHandler cambriaHandler;
73 private final DistributionEngineClusterHealth distributionEngineClusterHealth;
74 private final DistributionCompleteReporter distributionCompleteReporter;
76 public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
77 this.dmaapConsumer = dmaapConsumer;
78 this.operationalEnvironmentDao = operationalEnvironmentDao;
79 this.epIterCreator = epIterCreator;
80 this.aaiRequestHandler = aaiRequestHandler;
81 this.componentUtils = componentUtils;
82 this.cambriaHandler = cambriaHandler;
83 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
84 this.distributionCompleteReporter = distributionCompleteReporter;
90 log.trace("Environments engine has been initialized. ");
92 environments = populateEnvironments();
93 createUebTopicsForEnvironments();
94 dmaapConsumer.consumeDmaapTopic(this::handleMessage,
95 (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
98 log.error("An error occurred upon consuming topic by Dmaap consumer client." , e);
101 public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
102 AtomicBoolean status,
103 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
104 connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
108 public void connectUebTopicForDistributionConfTopic(String envName,
109 AtomicBoolean status,
110 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
111 connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
115 * Allows to create and run UEB initializing and polling tasks
117 * @param envNamePerInitTask
118 * @param envNamePerPollingTask
121 private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
122 Map<String, DistributionEngineInitTask> envNamePerInitTask,
123 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
125 String envId = opEnvEntry.getEnvironmentId();
127 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
128 .getDistributionEngineConfiguration();
129 DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
130 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
132 String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
133 DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
134 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
136 distributionEngineInitTask.startTask();
137 envNamePerInitTask.put(envId, distributionEngineInitTask);
138 envNamePerPollingTask.put(envId, distributionEnginePollingTask);
140 log.debug("Environment envId = {} has been connected to the UEB topic", envId);
144 public boolean handleMessage(String notification) {
145 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
146 .getConfiguration().getDmaapConsumerConfiguration();
147 Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
148 Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
149 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
152 if (eitherTimeOut.isRight()) {
155 result = eitherTimeOut.left().value();
160 public boolean handleMessageLogic(String notification) {
161 Wrapper<Boolean> errorWrapper = new Wrapper<>();
162 Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
165 log.debug("handle message - for operational environment notification received: {}", notification);
166 Gson gsonObj = new GsonBuilder().create();
168 IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
169 DmaapNotificationDataImpl.class);
170 IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
171 DmaapNotificationDataImpl.class);
173 AuditingActionEnum actionEnum;
174 switch(notificationData.getAction()) {
176 actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
179 actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
182 actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
185 actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
188 componentUtils.auditEnvironmentEngine(actionEnum,
189 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
190 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
191 auditNotificationData.getTenantContext());
193 if (errorWrapper.isEmpty()) {
194 validateNotification(errorWrapper, notificationData, auditNotificationData);
196 // Perform Save In-Progress Dao
197 if (errorWrapper.isEmpty()) {
198 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
201 if (errorWrapper.isEmpty()) {
202 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
205 } catch (Exception e) {
206 log.debug("handle message for operational environmet failed for notification: {} with error :{}",
207 notification, e.getMessage(), e);
208 errorWrapper.setInnerElement(false);
211 return errorWrapper.isEmpty();
214 private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
215 IDmaapAuditNotificationData auditNotificationData) {
216 // Check OperationaEnvironmentType
217 if (errorWrapper.isEmpty()) {
218 validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
221 if (errorWrapper.isEmpty()) {
222 validateActionType(errorWrapper, notificationData);
224 // Check is valid for create/update (not In-Progress state)
225 if (errorWrapper.isEmpty()) {
226 validateState(errorWrapper, notificationData);
230 public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
231 // Get Env Info From A&AI
232 if (errorWrapper.isEmpty()) {
233 retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
236 if (errorWrapper.isEmpty()) {
237 // Get List Of UEB Addresses From AFT_DME
238 retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
241 // Create UEB keys and set them on EnvEntry
242 if (errorWrapper.isEmpty()) {
243 createUebKeys(errorWrapper, opEnvEntry);
247 if (errorWrapper.isEmpty()) {
248 log.debug("handle message - Create Topics");
249 createUebTopicsForEnvironment(opEnvEntry);
252 // Save Status Complete and Add to Map
253 if (errorWrapper.isEmpty()) {
254 saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
257 // Update Environments Map
258 if (errorWrapper.isEmpty()) {
259 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
262 saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
266 private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
267 log.debug("handle message - save OperationalEnvironment Failed Status");
268 opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
269 saveOpEnvEntry(errorWrapper, opEnvEntry);
272 void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
273 log.debug("handle message - save OperationalEnvironment Complete Dao");
274 opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
275 saveOpEnvEntry(errorWrapper, opEnvEntry);
279 void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
280 log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
282 boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
283 if( isKeyFieldsValid ){
284 String opEnvKey = map2OpEnvKey(opEnvEntry);
285 String environmentId = opEnvEntry.getEnvironmentId();
286 List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
287 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
290 errorWrapper.setInnerElement(false);
291 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
294 } catch (DME2Exception e) {
295 errorWrapper.setInnerElement(false);
296 log.error("Failed to retrieve Ueb Addresses From DME. ", e);
300 void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
301 log.debug("handle message - Create UEB keys");
302 List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
303 .collect(Collectors.toList());
304 Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
305 .createUebKeys(discoverEndPoints);
306 if (eitherCreateUebKeys.isRight()) {
307 errorWrapper.setInnerElement(false);
308 log.debug("handle message - failed to create UEB Keys");
310 ApiCredential apiCredential = eitherCreateUebKeys.left().value();
311 opEnvEntry.setUebApikey(apiCredential.getApiKey());
312 opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
316 void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
317 log.debug("handle message - Get Env Info From A&AI");
318 Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
319 opEnvEntry.getEnvironmentId());
320 if (eitherOperationalEnvInfo.isRight()) {
321 errorWrapper.setInnerElement(false);
322 log.debug("handle message - failed to retrieve details from A&AI");
324 OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
325 opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
326 opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
330 void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
331 log.debug("handle message - save OperationalEnvironment In-Progress Dao");
332 OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
333 // Entry Environment ID holds actually the environment NAME
334 opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
335 opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
336 opEnvEntry.setIsProduction(false);
337 saveOpEnvEntry(errorWrapper, opEnvEntry);
338 opEnvEntryWrapper.setInnerElement(opEnvEntry);
343 void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
344 log.debug("handle message - verify OperationalEnvironment not In-Progress");
345 String opEnvId = notificationData.getOperationalEnvironmentId();
347 Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
349 if (eitherOpEnv.isLeft()) {
350 final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
351 if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
352 errorWrapper.setInnerElement(false);
353 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
356 CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
357 if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
358 errorWrapper.setInnerElement(false);
359 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
366 void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
367 log.debug("handle message - verify Action Type");
368 DmaapActionEnum action = notificationData.getAction();
369 if (action == DmaapActionEnum.DELETE) {
370 errorWrapper.setInnerElement(false);
371 log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
375 void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
376 IDmaapAuditNotificationData auditNotificationData) {
377 log.debug("handle message - verify OperationaEnvironmentType");
378 OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
379 if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
380 errorWrapper.setInnerElement(false);
381 log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
382 componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
383 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
384 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
385 auditNotificationData.getTenantContext());
390 private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
391 entry.setLastModified(new Date(System.currentTimeMillis()));
392 CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
393 if (saveStaus != CassandraOperationStatus.OK) {
394 errorWrapper.setInnerElement(false);
395 log.debug("handle message saving operational environmet failed for id :{} with error : {}",
396 entry.getEnvironmentId(), saveStaus);
400 public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception {
401 DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration();
402 List<String> uebHosts = new LinkedList<>();
404 String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey,
406 DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
408 // Beginning iteration
409 while (iterator.hasNext()) {
410 DME2EndpointReference ref = iterator.next();
411 DME2Endpoint dmeEndpoint = ref.getEndpoint();
412 log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
413 uebHosts.add(dmeEndpoint.getHost());
419 private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
420 return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
423 private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
424 Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
425 OperationalEnvironmentEntry confEntry = readEnvFromConfig();
426 envs.put(confEntry.getEnvironmentId(), confEntry);
430 private OperationalEnvironmentEntry readEnvFromConfig() {
431 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
432 DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
433 .getDistributionEngineConfiguration();
434 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
435 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
437 Set<String> puebEndpoints = new HashSet<>();
438 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
439 entry.setDmaapUebAddress(puebEndpoints);
441 String envName = distributionEngineConfiguration.getEnvironments().size() == 1
442 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
443 entry.setEnvironmentId(envName);
445 if(log.isDebugEnabled()) {
446 log.debug("Enviroment read from configuration: {}", entry.toString());
452 private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
453 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
454 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
456 if (opEnvResult.isLeft()) {
457 Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
458 .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
459 resultMap.forEach( (key, value) -> log.debug("Enviroment loaded from DB: {}", value.toString()) );
462 CassandraOperationStatus status = opEnvResult.right().value();
463 log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
464 return new HashMap<>();
468 void createUebTopicsForEnvironments() {
469 environments.values().forEach(this::createUebTopicsForEnvironment);
472 public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
473 String envId = opEnvEntry.getEnvironmentId();
474 log.debug("Create Environment {} on UEB Topic.", envId);
475 AtomicBoolean status = new AtomicBoolean(false);
476 envNamePerStatus.put(envId, status);
478 connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
482 void setConfigurationManager(ConfigurationManager configurationManager) {
483 this.configurationManager = configurationManager;
486 public Map<String, OperationalEnvironmentEntry> getEnvironments() {
491 public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
492 HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
493 if (resp.getStatusCode() == HttpStatus.SC_OK) {
495 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
497 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
498 return Either.left(operationalEnvInfo);
499 } catch (Exception e) {
500 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
501 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
504 log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
505 resp.getStatusCode(), resp.getDescription());
506 return Either.right(resp.getStatusCode());
510 public OperationalEnvironmentEntry getEnvironmentById (String envId) {
511 return environments.get(envId);
514 public boolean isInMap(OperationalEnvironmentEntry env) {
515 return isInMap(env.getEnvironmentId());
518 public boolean isInMap(String envId) {
519 return environments.containsKey(envId);
522 public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
523 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);