2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.distribution.engine;
22 import static org.apache.commons.lang3.StringUtils.isEmpty;
23 import static org.glassfish.jersey.internal.guava.Predicates.not;
24 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
26 import com.att.aft.dme2.api.DME2Exception;
27 import com.att.aft.dme2.iterator.DME2EndpointIterator;
28 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
29 import com.att.aft.dme2.manager.registry.DME2Endpoint;
30 import com.att.nsa.apiClient.credentials.ApiCredential;
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.gson.Gson;
33 import com.google.gson.GsonBuilder;
34 import fj.data.Either;
35 import java.util.Date;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.LinkedList;
39 import java.util.List;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.function.Function;
44 import java.util.function.Supplier;
45 import java.util.stream.Collectors;
46 import javax.annotation.PostConstruct;
47 import org.apache.commons.lang3.StringUtils;
48 import org.apache.http.HttpStatus;
49 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
50 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
51 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
52 import org.openecomp.sdc.be.components.impl.exceptions.ByActionStatusComponentException;
53 import org.openecomp.sdc.be.config.ConfigurationManager;
54 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
55 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
56 import org.openecomp.sdc.be.dao.api.ActionStatus;
57 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
58 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
59 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
60 import org.openecomp.sdc.be.impl.ComponentsUtils;
61 import org.openecomp.sdc.be.info.OperationalEnvInfo;
62 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
63 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
64 import org.openecomp.sdc.common.datastructure.Wrapper;
65 import org.openecomp.sdc.common.http.client.api.HttpResponse;
66 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
67 import org.openecomp.sdc.common.log.wrappers.Logger;
68 import org.springframework.stereotype.Service;
71 * Allows to consume DMAAP topic and handle received notifications
74 public class EnvironmentsEngine implements INotificationHandler {
76 private static final String MESSAGE_BUS = "MessageBus";
77 private static final String UNKNOWN = "Unknown";
78 private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
79 private static final String LOG_PARTNER_NAME = "SDC.BE";
80 private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
81 private final DmaapConsumer dmaapConsumer;
82 private final OperationalEnvironmentDao operationalEnvironmentDao;
83 private final DME2EndpointIteratorCreator epIterCreator;
84 private final AaiRequestHandler aaiRequestHandler;
85 private final ComponentsUtils componentUtils;
86 private final CambriaHandler cambriaHandler;
87 private final DistributionEngineClusterHealth distributionEngineClusterHealth;
88 private final DistributionCompleteReporter distributionCompleteReporter;
89 private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
90 private Map<String, OperationalEnvironmentEntry> environments = new HashMap<>();
91 private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
92 private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
93 private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
95 public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao,
96 DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils,
97 CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth,
98 DistributionCompleteReporter distributionCompleteReporter) {
99 this.dmaapConsumer = dmaapConsumer;
100 this.operationalEnvironmentDao = operationalEnvironmentDao;
101 this.epIterCreator = epIterCreator;
102 this.aaiRequestHandler = aaiRequestHandler;
103 this.componentUtils = componentUtils;
104 this.cambriaHandler = cambriaHandler;
105 this.distributionEngineClusterHealth = distributionEngineClusterHealth;
106 this.distributionCompleteReporter = distributionCompleteReporter;
113 mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
114 environments = populateEnvironments();
115 createUebTopicsForEnvironments();
116 initDmeGlobalConfig();
117 if (!configurationManager.getConfiguration().getDmaapConsumerConfiguration().isActive()) {
118 log.info("Environments engine is disabled");
122 .consumeDmaapTopic(this::handleMessage, (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
123 log.info("Environments engine has been initialized.");
124 } catch (Exception e) {
125 log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
129 private void initDmeGlobalConfig() {
130 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration()
131 .getDmaapConsumerConfiguration();
132 if (dmaapConsumerParams == null) {
133 log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
136 System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getAftEnvironment()); // AFTPRD for production
138 System.setProperty("AFT_LATITUDE",
139 dmaapConsumerParams.getLatitude() != null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
141 System.setProperty("AFT_LONGITUDE",
142 dmaapConsumerParams.getLongitude() != null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
145 public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
146 Map<String, DistributionEngineInitTask> envNamePerInitTask,
147 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
148 connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
151 public void connectUebTopicForDistributionConfTopic(String envName, AtomicBoolean status,
152 Map<String, DistributionEngineInitTask> envNamePerInitTask,
153 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
154 connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
158 * Allows to create and run UEB initializing and polling tasks
161 * @param envNamePerInitTask
162 * @param envNamePerPollingTask
165 private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
166 Map<String, DistributionEngineInitTask> envNamePerInitTask,
167 Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
168 String envId = opEnvEntry.getEnvironmentId();
169 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
170 .getDistributionEngineConfiguration();
171 DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(distributionEngineConfiguration,
172 distributionCompleteReporter, componentUtils, distributionEngineClusterHealth, opEnvEntry);
173 String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
174 DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l, distributionEngineConfiguration, envName, status,
175 componentUtils, distributionEnginePollingTask, opEnvEntry);
176 distributionEngineInitTask.startTask();
177 envNamePerInitTask.put(envId, distributionEngineInitTask);
178 envNamePerPollingTask.put(envId, distributionEnginePollingTask);
179 log.debug("Environment envId = {} has been connected to the UEB topic", envId);
183 public boolean handleMessage(String notification) {
184 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration()
185 .getDmaapConsumerConfiguration();
186 Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
187 Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier, dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
189 if (eitherTimeOut.isRight()) {
192 result = eitherTimeOut.left().value();
197 public boolean handleMessageLogic(String notification) {
198 Wrapper<Boolean> errorWrapper = new Wrapper<>();
199 Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
201 log.debug("handle message - for operational environment notification received: {}", notification);
202 Gson gsonObj = new GsonBuilder().create();
203 IDmaapNotificationData notificationData = gsonObj.fromJson(notification, DmaapNotificationDataImpl.class);
204 IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification, DmaapNotificationDataImpl.class);
205 AuditingActionEnum actionEnum;
206 switch (notificationData.getAction()) {
208 actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
211 actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
214 actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
217 actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
220 componentUtils.auditEnvironmentEngine(actionEnum, notificationData.getOperationalEnvironmentId(),
221 notificationData.getOperationalEnvironmentType().getEventTypenName(), notificationData.getAction().getActionName(),
222 auditNotificationData.getOperationalEnvironmentName(), auditNotificationData.getTenantContext());
223 if (errorWrapper.isEmpty()) {
224 validateNotification(errorWrapper, notificationData, auditNotificationData);
226 // Perform Save In-Progress Dao
227 if (errorWrapper.isEmpty()) {
228 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
230 if (errorWrapper.isEmpty()) {
231 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
233 } catch (Exception e) {
234 log.debug("handle message for operational environment failed for notification: {} with error :{}", notification, e.getMessage(), e);
235 errorWrapper.setInnerElement(false);
237 return errorWrapper.isEmpty();
240 private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
241 IDmaapAuditNotificationData auditNotificationData) {
242 // Check OperationaEnvironmentType
243 if (errorWrapper.isEmpty()) {
244 validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
247 if (errorWrapper.isEmpty()) {
248 validateActionType(errorWrapper, notificationData);
250 // Check is valid for create/update (not In-Progress state)
251 if (errorWrapper.isEmpty()) {
252 validateState(errorWrapper, notificationData);
256 public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
257 // Get Env Info From A&AI
258 if (errorWrapper.isEmpty()) {
259 retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
261 if (errorWrapper.isEmpty()) {
262 // Get List Of UEB Addresses From AFT_DME
263 retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
265 // Create UEB keys and set them on EnvEntry
266 if (errorWrapper.isEmpty()) {
267 createUebKeys(errorWrapper, opEnvEntry);
270 if (errorWrapper.isEmpty()) {
271 log.debug("handle message - Create Topics");
272 createUebTopicsForEnvironment(opEnvEntry);
274 // Save Status Complete and Add to Map
275 if (errorWrapper.isEmpty()) {
276 saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
278 // Update Environments Map
279 if (errorWrapper.isEmpty()) {
280 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
282 saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
286 private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
287 log.debug("handle message - save OperationalEnvironment Failed Status");
288 opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
289 saveOpEnvEntry(errorWrapper, opEnvEntry);
292 void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
293 log.debug("handle message - save OperationalEnvironment Complete Dao");
294 opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
295 saveOpEnvEntry(errorWrapper, opEnvEntry);
298 void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
299 log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
300 log.invoke(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), EnvironmentsEngine.class.getName(),
301 errorWrapper.toString());
303 boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
304 if (isKeyFieldsValid) {
305 String opEnvKey = map2OpEnvKey(opEnvEntry);
306 List<String> uebHosts = discoverUebHosts(opEnvKey);
307 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
308 log.invokeReturn(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), "SDC-BE",
309 errorWrapper.toString());
311 errorWrapper.setInnerElement(false);
312 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
314 } catch (Exception e) {
315 errorWrapper.setInnerElement(false);
316 log.error("Failed to retrieve Ueb Addresses From DME. ", e);
320 void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
321 log.debug("handle message - Create UEB keys");
322 List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream().collect(Collectors.toList());
323 Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler.createUebKeys(discoverEndPoints);
324 if (eitherCreateUebKeys.isRight()) {
325 errorWrapper.setInnerElement(false);
326 log.debug("handle message - failed to create UEB Keys");
328 ApiCredential apiCredential = eitherCreateUebKeys.left().value();
329 opEnvEntry.setUebApikey(apiCredential.getApiKey());
330 opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
334 void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
335 log.debug("handle message - Get Env Info From A&AI");
336 Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(opEnvEntry.getEnvironmentId());
337 if (eitherOperationalEnvInfo.isRight()) {
338 errorWrapper.setInnerElement(false);
339 log.debug("handle message - failed to retrieve details from A&AI");
341 OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
342 opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
343 opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
347 void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper,
348 IDmaapNotificationData notificationData) {
349 log.debug("handle message - save OperationalEnvironment In-Progress Dao");
350 OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
351 // Entry Environment ID holds actually the environment NAME
352 opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
353 opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
354 opEnvEntry.setIsProduction(false);
355 saveOpEnvEntry(errorWrapper, opEnvEntry);
356 opEnvEntryWrapper.setInnerElement(opEnvEntry);
359 void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
360 log.debug("handle message - verify OperationalEnvironment not In-Progress");
361 String opEnvId = notificationData.getOperationalEnvironmentId();
362 Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao.get(opEnvId);
363 if (eitherOpEnv.isLeft()) {
364 final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
365 if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
366 errorWrapper.setInnerElement(false);
367 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
370 CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
371 if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
372 errorWrapper.setInnerElement(false);
373 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId, operationStatus);
378 void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
379 log.debug("handle message - verify Action Type");
380 DmaapActionEnum action = notificationData.getAction();
381 if (action == DmaapActionEnum.DELETE) {
382 errorWrapper.setInnerElement(false);
383 log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
387 void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
388 IDmaapAuditNotificationData auditNotificationData) {
389 log.debug("handle message - verify OperationaEnvironmentType");
390 OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
391 if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
392 errorWrapper.setInnerElement(false);
393 log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
394 componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE, notificationData.getOperationalEnvironmentId(),
395 notificationData.getOperationalEnvironmentType().getEventTypenName(), notificationData.getAction().getActionName(),
396 auditNotificationData.getOperationalEnvironmentName(), auditNotificationData.getTenantContext());
400 private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
401 entry.setLastModified(new Date(System.currentTimeMillis()));
402 CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
403 if (saveStaus != CassandraOperationStatus.OK) {
404 errorWrapper.setInnerElement(false);
405 log.debug("handle message saving operational environmet failed for id :{} with error : {}", entry.getEnvironmentId(), saveStaus);
409 public List<String> discoverUebHosts(String opEnvKey) throws DME2Exception {
410 String lookupUriFormat = configurationManager.getConfiguration().getDmeConfiguration().getLookupUriFormat();
411 String environment = configurationManager.getConfiguration().getDmaapConsumerConfiguration().getEnvironment();
412 String lookupURI = String.format(lookupUriFormat, opEnvKey, environment);
413 log.debug("DME2 GRM URI: {}", lookupURI);
414 List<String> uebHosts = new LinkedList<>();
415 DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
416 // Beginning iteration
417 while (iterator.hasNext()) {
418 DME2EndpointReference ref = iterator.next();
419 DME2Endpoint dmeEndpoint = ref.getEndpoint();
420 log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
421 uebHosts.add(dmeEndpoint.getHost());
426 private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
427 return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
430 private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
431 Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
432 OperationalEnvironmentEntry confEntry = readEnvFromConfig();
433 envs.put(confEntry.getEnvironmentId(), confEntry);
437 private OperationalEnvironmentEntry readEnvFromConfig() {
438 OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
439 DistributionEngineConfiguration distributionEngineConfiguration = configurationManager.getDistributionEngineConfiguration();
440 entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
441 entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
442 Set<String> puebEndpoints = new HashSet<>();
443 puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
444 entry.setDmaapUebAddress(puebEndpoints);
446 distributionEngineConfiguration.getEnvironments().size() == 1 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
447 entry.setEnvironmentId(envName);
448 entry.setIsProduction(true);
449 if (log.isDebugEnabled()) {
450 log.debug("Enviroment read from configuration: {}", entry);
455 private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
456 Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
457 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
458 if (opEnvResult.isLeft()) {
459 Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
460 .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
461 resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
464 CassandraOperationStatus status = opEnvResult.right().value();
465 log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
466 return new HashMap<>();
470 void createUebTopicsForEnvironments() {
471 environments.values().stream().filter(not(OperationalEnvironmentEntry::getIsProduction)).forEach(this::createUebTopicsForEnvironment);
474 public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
475 String envId = opEnvEntry.getEnvironmentId();
476 log.debug("Create Environment {} on UEB Topic.", envId);
477 AtomicBoolean status = new AtomicBoolean(false);
478 envNamePerStatus.put(envId, status);
479 connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
483 void setConfigurationManager(ConfigurationManager configurationManager) {
484 this.configurationManager = configurationManager;
487 public Map<String, OperationalEnvironmentEntry> getEnvironments() {
491 public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
492 return environments.values().stream().filter(e -> e.getDmaapUebAddress().stream().filter(dmaapUebAddress::contains).findAny().isPresent())
494 .orElseThrow(() -> new ByActionStatusComponentException(ActionStatus.DISTRIBUTION_ENV_DOES_NOT_EXIST, dmaapUebAddress.toString()));
497 public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
498 HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
499 if (resp.getStatusCode() == HttpStatus.SC_OK) {
501 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
502 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
503 return Either.left(operationalEnvInfo);
504 } catch (Exception e) {
505 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
506 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
509 log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id, resp.getStatusCode(),
510 resp.getDescription());
511 return Either.right(resp.getStatusCode());
515 public OperationalEnvironmentEntry getEnvironmentById(String envId) {
516 return environments.get(envId);
519 public boolean isInMap(OperationalEnvironmentEntry env) {
520 return isInMap(env.getEnvironmentId());
523 public boolean isInMap(String envId) {
524 return environments.containsKey(envId);
527 public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
528 environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);