2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.components.distribution.engine;
23 import com.att.aft.dme2.api.DME2Exception;
24 import com.att.aft.dme2.iterator.DME2EndpointIterator;
25 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
26 import com.att.aft.dme2.manager.registry.DME2Endpoint;
27 import com.att.nsa.apiClient.credentials.ApiCredential;
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.gson.Gson;
30 import com.google.gson.GsonBuilder;
31 import fj.data.Either;
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
35 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
36 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
37 import org.openecomp.sdc.be.components.impl.exceptions.ByActionStatusComponentException;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
40 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
41 import org.openecomp.sdc.be.dao.api.ActionStatus;
42 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
43 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
44 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
45 import org.openecomp.sdc.be.impl.ComponentsUtils;
46 import org.openecomp.sdc.be.info.OperationalEnvInfo;
47 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
48 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
49 import org.openecomp.sdc.common.datastructure.Wrapper;
50 import org.openecomp.sdc.common.http.client.api.HttpResponse;
51 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
52 import org.openecomp.sdc.common.log.wrappers.Logger;
53 import org.springframework.stereotype.Service;
55 import javax.annotation.PostConstruct;
56 import java.util.Date;
57 import java.util.HashMap;
58 import java.util.HashSet;
59 import java.util.LinkedList;
60 import java.util.List;
63 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.function.Function;
65 import java.util.function.Supplier;
66 import java.util.stream.Collectors;
68 import static org.apache.commons.lang3.StringUtils.isEmpty;
69 import static org.glassfish.jersey.internal.guava.Predicates.not;
70 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
73 * Allows to consume DMAAP topic and handle received notifications
76 public class EnvironmentsEngine implements INotificationHandler {
78 private static final String MESSAGE_BUS = "MessageBus";
79 private static final String UNKNOWN = "Unknown";
80 private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
81 private static final String LOG_PARTNER_NAME = "SDC.BE";
82 private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
84 private Map<String, OperationalEnvironmentEntry> environments = new HashMap<>();
85 private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
86 private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
87 private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
89 private final DmaapConsumer dmaapConsumer;
90 private final OperationalEnvironmentDao operationalEnvironmentDao;
91 private final DME2EndpointIteratorCreator epIterCreator;
92 private final AaiRequestHandler aaiRequestHandler;
93 private final ComponentsUtils componentUtils;
94 private final CambriaHandler cambriaHandler;
95 private final DistributionEngineClusterHealth distributionEngineClusterHealth;
96 private final DistributionCompleteReporter distributionCompleteReporter;
97 private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
99 public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
100 this.dmaapConsumer = dmaapConsumer;
101 this.operationalEnvironmentDao = operationalEnvironmentDao;
102 this.epIterCreator = epIterCreator;
103 this.aaiRequestHandler = aaiRequestHandler;
104 this.componentUtils = componentUtils;
105 this.cambriaHandler = cambriaHandler;
106 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
107 this.distributionCompleteReporter = distributionCompleteReporter;
114 mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
115 environments = populateEnvironments();
116 createUebTopicsForEnvironments();
117 initDmeGlobalConfig();
118 if(!configurationManager.getConfiguration().getDmaapConsumerConfiguration().isActive()){
119 log.info("Environments engine is disabled");
122 dmaapConsumer.consumeDmaapTopic(this::handleMessage,
123 (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
124 log.info("Environments engine has been initialized.");
125 } catch (Exception e) {
126 log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
130 private void initDmeGlobalConfig() {
131 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
132 if (dmaapConsumerParams == null) {
133 log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
136 System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getAftEnvironment()); // AFTPRD for production
137 System.setProperty("AFT_LATITUDE", dmaapConsumerParams.getLatitude() != null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
138 System.setProperty("AFT_LONGITUDE", dmaapConsumerParams.getLongitude() != null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
141 public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
142 AtomicBoolean status,
143 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
144 connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
148 public void connectUebTopicForDistributionConfTopic(String envName,
149 AtomicBoolean status,
150 Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
151 connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
156 * Allows to create and run UEB initializing and polling tasks
159 * @param envNamePerInitTask
160 * @param envNamePerPollingTask
163 private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
164 Map<String, DistributionEngineInitTask> envNamePerInitTask,
165 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
167 String envId = opEnvEntry.getEnvironmentId();
169 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
170 .getDistributionEngineConfiguration();
171 DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
172 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
174 String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
175 DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
176 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
178 distributionEngineInitTask.startTask();
179 envNamePerInitTask.put(envId, distributionEngineInitTask);
180 envNamePerPollingTask.put(envId, distributionEnginePollingTask);
182 log.debug("Environment envId = {} has been connected to the UEB topic", envId);
186 public boolean handleMessage(String notification) {
187 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
188 .getConfiguration().getDmaapConsumerConfiguration();
189 Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
190 Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
191 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
194 if (eitherTimeOut.isRight()) {
197 result = eitherTimeOut.left().value();
202 public boolean handleMessageLogic(String notification) {
203 Wrapper<Boolean> errorWrapper = new Wrapper<>();
204 Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
207 log.debug("handle message - for operational environment notification received: {}", notification);
208 Gson gsonObj = new GsonBuilder().create();
210 IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
211 DmaapNotificationDataImpl.class);
212 IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
213 DmaapNotificationDataImpl.class);
215 AuditingActionEnum actionEnum;
216 switch (notificationData.getAction()) {
218 actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
221 actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
224 actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
227 actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
230 componentUtils.auditEnvironmentEngine(actionEnum,
231 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
232 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
233 auditNotificationData.getTenantContext());
235 if (errorWrapper.isEmpty()) {
236 validateNotification(errorWrapper, notificationData, auditNotificationData);
238 // Perform Save In-Progress Dao
239 if (errorWrapper.isEmpty()) {
240 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
243 if (errorWrapper.isEmpty()) {
244 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
247 } catch (Exception e) {
248 log.debug("handle message for operational environment failed for notification: {} with error :{}",
249 notification, e.getMessage(), e);
250 errorWrapper.setInnerElement(false);
253 return errorWrapper.isEmpty();
256 private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
257 IDmaapAuditNotificationData auditNotificationData) {
258 // Check OperationaEnvironmentType
259 if (errorWrapper.isEmpty()) {
260 validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
263 if (errorWrapper.isEmpty()) {
264 validateActionType(errorWrapper, notificationData);
266 // Check is valid for create/update (not In-Progress state)
267 if (errorWrapper.isEmpty()) {
268 validateState(errorWrapper, notificationData);
272 public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
273 // Get Env Info From A&AI
274 if (errorWrapper.isEmpty()) {
275 retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
278 if (errorWrapper.isEmpty()) {
279 // Get List Of UEB Addresses From AFT_DME
280 retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
283 // Create UEB keys and set them on EnvEntry
284 if (errorWrapper.isEmpty()) {
285 createUebKeys(errorWrapper, opEnvEntry);
289 if (errorWrapper.isEmpty()) {
290 log.debug("handle message - Create Topics");
291 createUebTopicsForEnvironment(opEnvEntry);
294 // Save Status Complete and Add to Map
295 if (errorWrapper.isEmpty()) {
296 saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
299 // Update Environments Map
300 if (errorWrapper.isEmpty()) {
301 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
303 saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
307 private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
308 log.debug("handle message - save OperationalEnvironment Failed Status");
309 opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
310 saveOpEnvEntry(errorWrapper, opEnvEntry);
313 void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
314 log.debug("handle message - save OperationalEnvironment Complete Dao");
315 opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
316 saveOpEnvEntry(errorWrapper, opEnvEntry);
320 void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
321 log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
322 log.invoke(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), EnvironmentsEngine.class.getName(), errorWrapper.toString() );
324 boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
325 if (isKeyFieldsValid) {
326 String opEnvKey = map2OpEnvKey(opEnvEntry);
327 List<String> uebHosts = discoverUebHosts(opEnvKey);
328 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
329 log.invokeReturn(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), "SDC-BE", errorWrapper.toString() );
331 errorWrapper.setInnerElement(false);
332 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
335 } catch (Exception e) {
336 errorWrapper.setInnerElement(false);
337 log.error("Failed to retrieve Ueb Addresses From DME. ", e);
341 void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
342 log.debug("handle message - Create UEB keys");
343 List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
344 .collect(Collectors.toList());
345 Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
346 .createUebKeys(discoverEndPoints);
347 if (eitherCreateUebKeys.isRight()) {
348 errorWrapper.setInnerElement(false);
349 log.debug("handle message - failed to create UEB Keys");
351 ApiCredential apiCredential = eitherCreateUebKeys.left().value();
352 opEnvEntry.setUebApikey(apiCredential.getApiKey());
353 opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
357 void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
358 log.debug("handle message - Get Env Info From A&AI");
359 Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
360 opEnvEntry.getEnvironmentId());
361 if (eitherOperationalEnvInfo.isRight()) {
362 errorWrapper.setInnerElement(false);
363 log.debug("handle message - failed to retrieve details from A&AI");
365 OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
366 opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
367 opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
371 void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
372 log.debug("handle message - save OperationalEnvironment In-Progress Dao");
373 OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
374 // Entry Environment ID holds actually the environment NAME
375 opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
376 opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
377 opEnvEntry.setIsProduction(false);
378 saveOpEnvEntry(errorWrapper, opEnvEntry);
379 opEnvEntryWrapper.setInnerElement(opEnvEntry);
384 void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
385 log.debug("handle message - verify OperationalEnvironment not In-Progress");
386 String opEnvId = notificationData.getOperationalEnvironmentId();
388 Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
390 if (eitherOpEnv.isLeft()) {
391 final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
392 if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
393 errorWrapper.setInnerElement(false);
394 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
397 CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
398 if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
399 errorWrapper.setInnerElement(false);
400 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
407 void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
408 log.debug("handle message - verify Action Type");
409 DmaapActionEnum action = notificationData.getAction();
410 if (action == DmaapActionEnum.DELETE) {
411 errorWrapper.setInnerElement(false);
412 log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
416 void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
417 IDmaapAuditNotificationData auditNotificationData) {
418 log.debug("handle message - verify OperationaEnvironmentType");
419 OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
420 if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
421 errorWrapper.setInnerElement(false);
422 log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
423 componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
424 notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
425 notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
426 auditNotificationData.getTenantContext());
431 private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
432 entry.setLastModified(new Date(System.currentTimeMillis()));
433 CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
434 if (saveStaus != CassandraOperationStatus.OK) {
435 errorWrapper.setInnerElement(false);
436 log.debug("handle message saving operational environmet failed for id :{} with error : {}",
437 entry.getEnvironmentId(), saveStaus);
441 public List<String> discoverUebHosts(String opEnvKey) throws DME2Exception {
442 String lookupUriFormat = configurationManager.getConfiguration().getDmeConfiguration().getLookupUriFormat();
443 String environment = configurationManager.getConfiguration().getDmaapConsumerConfiguration().getEnvironment();
444 String lookupURI = String.format(lookupUriFormat, opEnvKey, environment);
445 log.debug("DME2 GRM URI: {}", lookupURI);
447 List<String> uebHosts = new LinkedList<>();
448 DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
449 // Beginning iteration
450 while (iterator.hasNext()) {
451 DME2EndpointReference ref = iterator.next();
452 DME2Endpoint dmeEndpoint = ref.getEndpoint();
453 log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
454 uebHosts.add(dmeEndpoint.getHost());
460 private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
461 return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
464 private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
465 Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
466 OperationalEnvironmentEntry confEntry = readEnvFromConfig();
467 envs.put(confEntry.getEnvironmentId(), confEntry);
471 private OperationalEnvironmentEntry readEnvFromConfig() {
472 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
473 DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
474 .getDistributionEngineConfiguration();
475 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
476 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
478 Set<String> puebEndpoints = new HashSet<>();
479 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
480 entry.setDmaapUebAddress(puebEndpoints);
482 String envName = distributionEngineConfiguration.getEnvironments().size() == 1
483 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
484 entry.setEnvironmentId(envName);
485 entry.setIsProduction(true);
487 if (log.isDebugEnabled()) {
488 log.debug("Enviroment read from configuration: {}", entry);
494 private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
495 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
496 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
498 if (opEnvResult.isLeft()) {
499 Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
500 .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
501 resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
504 CassandraOperationStatus status = opEnvResult.right().value();
505 log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
506 return new HashMap<>();
510 void createUebTopicsForEnvironments() {
511 environments.values().stream()
512 .filter(not(OperationalEnvironmentEntry::getIsProduction))
513 .forEach(this::createUebTopicsForEnvironment);
516 public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
517 String envId = opEnvEntry.getEnvironmentId();
518 log.debug("Create Environment {} on UEB Topic.", envId);
519 AtomicBoolean status = new AtomicBoolean(false);
520 envNamePerStatus.put(envId, status);
522 connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
526 void setConfigurationManager(ConfigurationManager configurationManager) {
527 this.configurationManager = configurationManager;
530 public Map<String, OperationalEnvironmentEntry> getEnvironments() {
534 public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
535 return environments.values().stream()
536 .filter(e -> e.getDmaapUebAddress().stream()
537 .filter(dmaapUebAddress::contains).findAny().isPresent())
539 .orElseThrow(() -> new ByActionStatusComponentException(ActionStatus.DISTRIBUTION_ENV_DOES_NOT_EXIST,dmaapUebAddress.toString()));
544 public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
545 HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
546 if (resp.getStatusCode() == HttpStatus.SC_OK) {
548 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
550 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
551 return Either.left(operationalEnvInfo);
552 } catch (Exception e) {
553 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
554 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
557 log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
558 resp.getStatusCode(), resp.getDescription());
559 return Either.right(resp.getStatusCode());
563 public OperationalEnvironmentEntry getEnvironmentById(String envId) {
564 return environments.get(envId);
567 public boolean isInMap(OperationalEnvironmentEntry env) {
568 return isInMap(env.getEnvironmentId());
571 public boolean isInMap(String envId) {
572 return environments.containsKey(envId);
575 public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
576 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);