315ba1de148de00e8ac38ae59be60b52c516983e
[sdc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import static org.apache.commons.lang3.StringUtils.isEmpty;
23 import static org.glassfish.jersey.internal.guava.Predicates.not;
24 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
25
26 import com.att.aft.dme2.api.DME2Exception;
27 import com.att.aft.dme2.iterator.DME2EndpointIterator;
28 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
29 import com.att.aft.dme2.manager.registry.DME2Endpoint;
30 import com.att.nsa.apiClient.credentials.ApiCredential;
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.gson.Gson;
33 import com.google.gson.GsonBuilder;
34 import fj.data.Either;
35 import java.util.Date;
36 import java.util.HashMap;
37 import java.util.HashSet;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.function.Function;
44 import java.util.function.Supplier;
45 import java.util.stream.Collectors;
46 import javax.annotation.PostConstruct;
47 import org.apache.commons.lang3.StringUtils;
48 import org.apache.http.HttpStatus;
49 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
50 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
51 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
52 import org.openecomp.sdc.be.components.impl.exceptions.ByActionStatusComponentException;
53 import org.openecomp.sdc.be.config.ConfigurationManager;
54 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
55 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
56 import org.openecomp.sdc.be.dao.api.ActionStatus;
57 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
58 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
59 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
60 import org.openecomp.sdc.be.impl.ComponentsUtils;
61 import org.openecomp.sdc.be.info.OperationalEnvInfo;
62 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
63 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
64 import org.openecomp.sdc.common.datastructure.Wrapper;
65 import org.openecomp.sdc.common.http.client.api.HttpResponse;
66 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
67 import org.openecomp.sdc.common.log.wrappers.Logger;
68 import org.springframework.stereotype.Service;
69
70 /**
71  * Allows to consume DMAAP topic and handle received notifications
72  */
73 @Service
74 public class EnvironmentsEngine implements INotificationHandler {
75
76     private static final String MESSAGE_BUS = "MessageBus";
77     private static final String UNKNOWN = "Unknown";
78     private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
79     private static final String LOG_PARTNER_NAME = "SDC.BE";
80     private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
81     private final DmaapConsumer dmaapConsumer;
82     private final OperationalEnvironmentDao operationalEnvironmentDao;
83     private final DME2EndpointIteratorCreator epIterCreator;
84     private final AaiRequestHandler aaiRequestHandler;
85     private final ComponentsUtils componentUtils;
86     private final CambriaHandler cambriaHandler;
87     private final DistributionEngineClusterHealth distributionEngineClusterHealth;
88     private final DistributionCompleteReporter distributionCompleteReporter;
89     private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
90     private Map<String, OperationalEnvironmentEntry> environments = new HashMap<>();
91     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
92     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
93     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
94
95     public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao,
96                               DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils,
97                               CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth,
98                               DistributionCompleteReporter distributionCompleteReporter) {
99         this.dmaapConsumer = dmaapConsumer;
100         this.operationalEnvironmentDao = operationalEnvironmentDao;
101         this.epIterCreator = epIterCreator;
102         this.aaiRequestHandler = aaiRequestHandler;
103         this.componentUtils = componentUtils;
104         this.cambriaHandler = cambriaHandler;
105         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
106         this.distributionCompleteReporter = distributionCompleteReporter;
107     }
108
109     @VisibleForTesting
110     @PostConstruct
111     void init() {
112         try {
113             mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
114             environments = populateEnvironments();
115             createUebTopicsForEnvironments();
116             initDmeGlobalConfig();
117             if (!configurationManager.getConfiguration().getDmaapConsumerConfiguration().isActive()) {
118                 log.info("Environments engine is disabled");
119                 return;
120             }
121             dmaapConsumer
122                 .consumeDmaapTopic(this::handleMessage, (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
123             log.info("Environments engine has been initialized.");
124         } catch (Exception e) {
125             log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
126         }
127     }
128
129     private void initDmeGlobalConfig() {
130         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration()
131             .getDmaapConsumerConfiguration();
132         if (dmaapConsumerParams == null) {
133             log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
134             return;
135         }
136         System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getAftEnvironment()); // AFTPRD for production
137
138         System.setProperty("AFT_LATITUDE",
139             dmaapConsumerParams.getLatitude() != null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
140
141         System.setProperty("AFT_LONGITUDE",
142             dmaapConsumerParams.getLongitude() != null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
143     }
144
145     public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
146                                                Map<String, DistributionEngineInitTask> envNamePerInitTask,
147                                                Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
148         connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
149     }
150
151     public void connectUebTopicForDistributionConfTopic(String envName, AtomicBoolean status,
152                                                         Map<String, DistributionEngineInitTask> envNamePerInitTask,
153                                                         Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
154         connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
155     }
156
157     /**
158      * Allows to create and run UEB initializing and polling tasks
159      *
160      * @param status
161      * @param envNamePerInitTask
162      * @param envNamePerPollingTask
163      * @param opEnvEntry
164      */
165     private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
166                                  Map<String, DistributionEngineInitTask> envNamePerInitTask,
167                                  Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
168         String envId = opEnvEntry.getEnvironmentId();
169         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
170             .getDistributionEngineConfiguration();
171         DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(distributionEngineConfiguration,
172             distributionCompleteReporter, componentUtils, distributionEngineClusterHealth, opEnvEntry);
173         String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
174         DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l, distributionEngineConfiguration, envName, status,
175             componentUtils, distributionEnginePollingTask, opEnvEntry);
176         distributionEngineInitTask.startTask();
177         envNamePerInitTask.put(envId, distributionEngineInitTask);
178         envNamePerPollingTask.put(envId, distributionEnginePollingTask);
179         log.debug("Environment envId = {} has been connected to the UEB topic", envId);
180     }
181
182     @Override
183     public boolean handleMessage(String notification) {
184         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration()
185             .getDmaapConsumerConfiguration();
186         Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
187         Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier, dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
188         boolean result;
189         if (eitherTimeOut.isRight()) {
190             result = false;
191         } else {
192             result = eitherTimeOut.left().value();
193         }
194         return result;
195     }
196
197     public boolean handleMessageLogic(String notification) {
198         Wrapper<Boolean> errorWrapper = new Wrapper<>();
199         Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
200         try {
201             log.debug("handle message - for operational environment notification received: {}", notification);
202             Gson gsonObj = new GsonBuilder().create();
203             IDmaapNotificationData notificationData = gsonObj.fromJson(notification, DmaapNotificationDataImpl.class);
204             IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification, DmaapNotificationDataImpl.class);
205             AuditingActionEnum actionEnum;
206             switch (notificationData.getAction()) {
207                 case CREATE:
208                     actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
209                     break;
210                 case UPDATE:
211                     actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
212                     break;
213                 case DELETE:
214                     actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
215                     break;
216                 default:
217                     actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
218                     break;
219             }
220             componentUtils.auditEnvironmentEngine(actionEnum, notificationData.getOperationalEnvironmentId(),
221                 notificationData.getOperationalEnvironmentType().getEventTypenName(), notificationData.getAction().getActionName(),
222                 auditNotificationData.getOperationalEnvironmentName(), auditNotificationData.getTenantContext());
223             if (errorWrapper.isEmpty()) {
224                 validateNotification(errorWrapper, notificationData, auditNotificationData);
225             }
226             // Perform Save In-Progress Dao
227             if (errorWrapper.isEmpty()) {
228                 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
229             }
230             if (errorWrapper.isEmpty()) {
231                 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
232             }
233         } catch (Exception e) {
234             log.debug("handle message for operational environment failed for notification: {} with error :{}", notification, e.getMessage(), e);
235             errorWrapper.setInnerElement(false);
236         }
237         return errorWrapper.isEmpty();
238     }
239
240     private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
241                                       IDmaapAuditNotificationData auditNotificationData) {
242         // Check OperationaEnvironmentType
243         if (errorWrapper.isEmpty()) {
244             validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
245         }
246         // Check Action Type
247         if (errorWrapper.isEmpty()) {
248             validateActionType(errorWrapper, notificationData);
249         }
250         // Check is valid for create/update (not In-Progress state)
251         if (errorWrapper.isEmpty()) {
252             validateState(errorWrapper, notificationData);
253         }
254     }
255
256     public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
257         // Get Env Info From A&AI
258         if (errorWrapper.isEmpty()) {
259             retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
260         }
261         if (errorWrapper.isEmpty()) {
262             // Get List Of UEB Addresses From AFT_DME
263             retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
264         }
265         // Create UEB keys and set them on EnvEntry
266         if (errorWrapper.isEmpty()) {
267             createUebKeys(errorWrapper, opEnvEntry);
268         }
269         // Create Topics
270         if (errorWrapper.isEmpty()) {
271             log.debug("handle message - Create Topics");
272             createUebTopicsForEnvironment(opEnvEntry);
273         }
274         // Save Status Complete and Add to Map
275         if (errorWrapper.isEmpty()) {
276             saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
277         }
278         // Update Environments Map
279         if (errorWrapper.isEmpty()) {
280             environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
281         } else {
282             saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
283         }
284     }
285
286     private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
287         log.debug("handle message - save OperationalEnvironment Failed Status");
288         opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
289         saveOpEnvEntry(errorWrapper, opEnvEntry);
290     }
291
292     void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
293         log.debug("handle message - save OperationalEnvironment Complete Dao");
294         opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
295         saveOpEnvEntry(errorWrapper, opEnvEntry);
296     }
297
298     void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
299         log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
300         log.invoke(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), EnvironmentsEngine.class.getName(),
301             errorWrapper.toString());
302         try {
303             boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
304             if (isKeyFieldsValid) {
305                 String opEnvKey = map2OpEnvKey(opEnvEntry);
306                 List<String> uebHosts = discoverUebHosts(opEnvKey);
307                 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
308                 log.invokeReturn(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), "SDC-BE",
309                     errorWrapper.toString());
310             } else {
311                 errorWrapper.setInnerElement(false);
312                 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
313             }
314         } catch (Exception e) {
315             errorWrapper.setInnerElement(false);
316             log.error("Failed to retrieve Ueb Addresses From DME. ", e);
317         }
318     }
319
320     void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
321         log.debug("handle message - Create UEB keys");
322         List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream().collect(Collectors.toList());
323         Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler.createUebKeys(discoverEndPoints);
324         if (eitherCreateUebKeys.isRight()) {
325             errorWrapper.setInnerElement(false);
326             log.debug("handle message - failed to create UEB Keys");
327         } else {
328             ApiCredential apiCredential = eitherCreateUebKeys.left().value();
329             opEnvEntry.setUebApikey(apiCredential.getApiKey());
330             opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
331         }
332     }
333
334     void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
335         log.debug("handle message - Get Env Info From A&AI");
336         Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(opEnvEntry.getEnvironmentId());
337         if (eitherOperationalEnvInfo.isRight()) {
338             errorWrapper.setInnerElement(false);
339             log.debug("handle message - failed to retrieve details from A&AI");
340         } else {
341             OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
342             opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
343             opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
344         }
345     }
346
347     void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper,
348                                        IDmaapNotificationData notificationData) {
349         log.debug("handle message - save OperationalEnvironment In-Progress Dao");
350         OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
351         // Entry Environment ID holds actually the environment NAME
352         opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
353         opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
354         opEnvEntry.setIsProduction(false);
355         saveOpEnvEntry(errorWrapper, opEnvEntry);
356         opEnvEntryWrapper.setInnerElement(opEnvEntry);
357     }
358
359     void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
360         log.debug("handle message - verify OperationalEnvironment not In-Progress");
361         String opEnvId = notificationData.getOperationalEnvironmentId();
362         Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao.get(opEnvId);
363         if (eitherOpEnv.isLeft()) {
364             final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
365             if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
366                 errorWrapper.setInnerElement(false);
367                 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
368             }
369         } else {
370             CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
371             if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
372                 errorWrapper.setInnerElement(false);
373                 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId, operationStatus);
374             }
375         }
376     }
377
378     void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
379         log.debug("handle message - verify Action Type");
380         DmaapActionEnum action = notificationData.getAction();
381         if (action == DmaapActionEnum.DELETE) {
382             errorWrapper.setInnerElement(false);
383             log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
384         }
385     }
386
387     void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
388                                  IDmaapAuditNotificationData auditNotificationData) {
389         log.debug("handle message - verify OperationaEnvironmentType");
390         OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
391         if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
392             errorWrapper.setInnerElement(false);
393             log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
394             componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE, notificationData.getOperationalEnvironmentId(),
395                 notificationData.getOperationalEnvironmentType().getEventTypenName(), notificationData.getAction().getActionName(),
396                 auditNotificationData.getOperationalEnvironmentName(), auditNotificationData.getTenantContext());
397         }
398     }
399
400     private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
401         entry.setLastModified(new Date(System.currentTimeMillis()));
402         CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
403         if (saveStaus != CassandraOperationStatus.OK) {
404             errorWrapper.setInnerElement(false);
405             log.debug("handle message saving  operational environmet failed for id :{} with error : {}", entry.getEnvironmentId(), saveStaus);
406         }
407     }
408
409     public List<String> discoverUebHosts(String opEnvKey) throws DME2Exception {
410         String lookupUriFormat = configurationManager.getConfiguration().getDmeConfiguration().getLookupUriFormat();
411         String environment = configurationManager.getConfiguration().getDmaapConsumerConfiguration().getEnvironment();
412         String lookupURI = String.format(lookupUriFormat, opEnvKey, environment);
413         log.debug("DME2 GRM URI: {}", lookupURI);
414         List<String> uebHosts = new LinkedList<>();
415         DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
416         // Beginning iteration
417         while (iterator.hasNext()) {
418             DME2EndpointReference ref = iterator.next();
419             DME2Endpoint dmeEndpoint = ref.getEndpoint();
420             log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
421             uebHosts.add(dmeEndpoint.getHost());
422         }
423         return uebHosts;
424     }
425
426     private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
427         return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
428     }
429
430     private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
431         Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
432         OperationalEnvironmentEntry confEntry = readEnvFromConfig();
433         envs.put(confEntry.getEnvironmentId(), confEntry);
434         return envs;
435     }
436
437     private OperationalEnvironmentEntry readEnvFromConfig() {
438         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
439         DistributionEngineConfiguration distributionEngineConfiguration = configurationManager.getDistributionEngineConfiguration();
440         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
441         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
442         Set<String> puebEndpoints = new HashSet<>();
443         puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
444         entry.setDmaapUebAddress(puebEndpoints);
445         String envName =
446             distributionEngineConfiguration.getEnvironments().size() == 1 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
447         entry.setEnvironmentId(envName);
448         entry.setIsProduction(true);
449         if (log.isDebugEnabled()) {
450             log.debug("Enviroment read from configuration: {}", entry);
451         }
452         return entry;
453     }
454
455     private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
456         Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
457             .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
458         if (opEnvResult.isLeft()) {
459             Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
460                 .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
461             resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
462             return resultMap;
463         } else {
464             CassandraOperationStatus status = opEnvResult.right().value();
465             log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
466             return new HashMap<>();
467         }
468     }
469
470     void createUebTopicsForEnvironments() {
471         environments.values().stream().filter(not(OperationalEnvironmentEntry::getIsProduction)).forEach(this::createUebTopicsForEnvironment);
472     }
473
474     public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
475         String envId = opEnvEntry.getEnvironmentId();
476         log.debug("Create Environment {} on UEB Topic.", envId);
477         AtomicBoolean status = new AtomicBoolean(false);
478         envNamePerStatus.put(envId, status);
479         connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
480     }
481
482     @VisibleForTesting
483     void setConfigurationManager(ConfigurationManager configurationManager) {
484         this.configurationManager = configurationManager;
485     }
486
487     public Map<String, OperationalEnvironmentEntry> getEnvironments() {
488         return environments;
489     }
490
491     public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
492         return environments.values().stream().filter(e -> e.getDmaapUebAddress().stream().filter(dmaapUebAddress::contains).findAny().isPresent())
493             .findFirst()
494             .orElseThrow(() -> new ByActionStatusComponentException(ActionStatus.DISTRIBUTION_ENV_DOES_NOT_EXIST, dmaapUebAddress.toString()));
495     }
496
497     public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
498         HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
499         if (resp.getStatusCode() == HttpStatus.SC_OK) {
500             try {
501                 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
502                 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
503                 return Either.left(operationalEnvInfo);
504             } catch (Exception e) {
505                 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
506                 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
507             }
508         } else {
509             log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id, resp.getStatusCode(),
510                 resp.getDescription());
511             return Either.right(resp.getStatusCode());
512         }
513     }
514
515     public OperationalEnvironmentEntry getEnvironmentById(String envId) {
516         return environments.get(envId);
517     }
518
519     public boolean isInMap(OperationalEnvironmentEntry env) {
520         return isInMap(env.getEnvironmentId());
521     }
522
523     public boolean isInMap(String envId) {
524         return environments.containsKey(envId);
525     }
526
527     public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
528         environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
529     }
530 }