Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / EnvironmentsEngine.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.aft.dme2.api.DME2Exception;
24 import com.att.aft.dme2.iterator.DME2EndpointIterator;
25 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
26 import com.att.aft.dme2.manager.registry.DME2Endpoint;
27 import com.att.nsa.apiClient.credentials.ApiCredential;
28 import com.google.common.annotations.VisibleForTesting;
29 import com.google.gson.Gson;
30 import com.google.gson.GsonBuilder;
31 import fj.data.Either;
32 import org.apache.commons.lang3.StringUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
35 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
36 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
37 import org.openecomp.sdc.be.components.impl.exceptions.ByActionStatusComponentException;
38 import org.openecomp.sdc.be.config.ConfigurationManager;
39 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
40 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
41 import org.openecomp.sdc.be.dao.api.ActionStatus;
42 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
43 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
44 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
45 import org.openecomp.sdc.be.impl.ComponentsUtils;
46 import org.openecomp.sdc.be.info.OperationalEnvInfo;
47 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
48 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
49 import org.openecomp.sdc.common.datastructure.Wrapper;
50 import org.openecomp.sdc.common.http.client.api.HttpResponse;
51 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
52 import org.openecomp.sdc.common.log.wrappers.Logger;
53 import org.springframework.stereotype.Service;
54
55 import javax.annotation.PostConstruct;
56 import java.util.Date;
57 import java.util.HashMap;
58 import java.util.HashSet;
59 import java.util.LinkedList;
60 import java.util.List;
61 import java.util.Map;
62 import java.util.Set;
63 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.function.Function;
65 import java.util.function.Supplier;
66 import java.util.stream.Collectors;
67
68 import static org.apache.commons.lang3.StringUtils.isEmpty;
69 import static org.glassfish.jersey.internal.guava.Predicates.not;
70 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
71
72 /**
73  * Allows to consume DMAAP topic and handle received notifications
74  */
75 @Service
76 public class EnvironmentsEngine implements INotificationHandler {
77
78     private static final String MESSAGE_BUS = "MessageBus";
79     private static final String UNKNOWN = "Unknown";
80     private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
81     private static final String LOG_PARTNER_NAME = "SDC.BE";
82     private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
83
84     private Map<String, OperationalEnvironmentEntry> environments = new HashMap<>();
85     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
86     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
87     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
88
89     private final DmaapConsumer dmaapConsumer;
90     private final OperationalEnvironmentDao operationalEnvironmentDao;
91     private final DME2EndpointIteratorCreator epIterCreator;
92     private final AaiRequestHandler aaiRequestHandler;
93     private final ComponentsUtils componentUtils;
94     private final CambriaHandler cambriaHandler;
95     private final DistributionEngineClusterHealth distributionEngineClusterHealth;
96     private final DistributionCompleteReporter distributionCompleteReporter;
97     private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
98
99     public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
100         this.dmaapConsumer = dmaapConsumer;
101         this.operationalEnvironmentDao = operationalEnvironmentDao;
102         this.epIterCreator = epIterCreator;
103         this.aaiRequestHandler = aaiRequestHandler;
104         this.componentUtils = componentUtils;
105         this.cambriaHandler = cambriaHandler;
106         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
107         this.distributionCompleteReporter = distributionCompleteReporter;
108     }
109
110     @VisibleForTesting
111     @PostConstruct
112     void init() {
113         try {
114             mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
115             environments = populateEnvironments();
116             createUebTopicsForEnvironments();
117             initDmeGlobalConfig();
118             if(!configurationManager.getConfiguration().getDmaapConsumerConfiguration().isActive()){
119                 log.info("Environments engine is disabled");
120                 return;
121             }
122             dmaapConsumer.consumeDmaapTopic(this::handleMessage,
123                 (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
124             log.info("Environments engine has been initialized.");
125         } catch (Exception e) {
126             log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
127         }
128     }
129
130     private void initDmeGlobalConfig() {
131         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
132         if (dmaapConsumerParams == null) {
133             log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
134             return;
135         }
136         System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getAftEnvironment()); // AFTPRD for production
137         System.setProperty("AFT_LATITUDE", dmaapConsumerParams.getLatitude() != null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
138         System.setProperty("AFT_LONGITUDE", dmaapConsumerParams.getLongitude() != null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
139     }
140
141     public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
142                                                AtomicBoolean status,
143                                                Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
144         connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
145
146     }
147
148     public void connectUebTopicForDistributionConfTopic(String envName,
149                                                         AtomicBoolean status,
150                                                         Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
151         connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
152
153     }
154
155     /**
156      * Allows to create and run UEB initializing and polling tasks
157      *
158      * @param status
159      * @param envNamePerInitTask
160      * @param envNamePerPollingTask
161      * @param opEnvEntry
162      */
163     private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
164                                  Map<String, DistributionEngineInitTask> envNamePerInitTask,
165                                  Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
166
167         String envId = opEnvEntry.getEnvironmentId();
168
169         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
170                 .getDistributionEngineConfiguration();
171         DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
172                 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
173                 opEnvEntry);
174         String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
175         DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
176                 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
177                 opEnvEntry);
178         distributionEngineInitTask.startTask();
179         envNamePerInitTask.put(envId, distributionEngineInitTask);
180         envNamePerPollingTask.put(envId, distributionEnginePollingTask);
181
182         log.debug("Environment envId = {} has been connected to the UEB topic", envId);
183     }
184
185     @Override
186     public boolean handleMessage(String notification) {
187         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
188                 .getConfiguration().getDmaapConsumerConfiguration();
189         Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
190         Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
191                 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
192
193         boolean result;
194         if (eitherTimeOut.isRight()) {
195             result = false;
196         } else {
197             result = eitherTimeOut.left().value();
198         }
199         return result;
200     }
201
202     public boolean handleMessageLogic(String notification) {
203         Wrapper<Boolean> errorWrapper = new Wrapper<>();
204         Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
205         try {
206
207             log.debug("handle message - for operational environment notification received: {}", notification);
208             Gson gsonObj = new GsonBuilder().create();
209
210             IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
211                     DmaapNotificationDataImpl.class);
212             IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
213                     DmaapNotificationDataImpl.class);
214
215             AuditingActionEnum actionEnum;
216             switch (notificationData.getAction()) {
217                 case CREATE:
218                     actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
219                     break;
220                 case UPDATE:
221                     actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
222                     break;
223                 case DELETE:
224                     actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
225                     break;
226                 default:
227                     actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
228                     break;
229             }
230             componentUtils.auditEnvironmentEngine(actionEnum,
231                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
232                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
233                     auditNotificationData.getTenantContext());
234
235             if (errorWrapper.isEmpty()) {
236                 validateNotification(errorWrapper, notificationData, auditNotificationData);
237             }
238             // Perform Save In-Progress Dao
239             if (errorWrapper.isEmpty()) {
240                 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
241             }
242
243             if (errorWrapper.isEmpty()) {
244                 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
245             }
246
247         } catch (Exception e) {
248             log.debug("handle message for operational environment failed for notification: {} with error :{}",
249                     notification, e.getMessage(), e);
250             errorWrapper.setInnerElement(false);
251
252         }
253         return errorWrapper.isEmpty();
254     }
255
256     private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
257                                       IDmaapAuditNotificationData auditNotificationData) {
258         // Check OperationaEnvironmentType
259         if (errorWrapper.isEmpty()) {
260             validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
261         }
262         // Check Action Type
263         if (errorWrapper.isEmpty()) {
264             validateActionType(errorWrapper, notificationData);
265         }
266         // Check is valid for create/update (not In-Progress state)
267         if (errorWrapper.isEmpty()) {
268             validateState(errorWrapper, notificationData);
269         }
270     }
271
272     public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
273         // Get Env Info From A&AI
274         if (errorWrapper.isEmpty()) {
275             retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
276         }
277
278         if (errorWrapper.isEmpty()) {
279             // Get List Of UEB Addresses From AFT_DME
280             retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
281         }
282
283         // Create UEB keys and set them on EnvEntry
284         if (errorWrapper.isEmpty()) {
285             createUebKeys(errorWrapper, opEnvEntry);
286         }
287
288         // Create Topics
289         if (errorWrapper.isEmpty()) {
290             log.debug("handle message - Create Topics");
291             createUebTopicsForEnvironment(opEnvEntry);
292         }
293
294         // Save Status Complete and Add to Map
295         if (errorWrapper.isEmpty()) {
296             saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
297         }
298
299         // Update Environments Map
300         if (errorWrapper.isEmpty()) {
301             environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
302         } else {
303             saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
304         }
305     }
306
307     private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
308         log.debug("handle message - save OperationalEnvironment Failed Status");
309         opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
310         saveOpEnvEntry(errorWrapper, opEnvEntry);
311     }
312
313     void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
314         log.debug("handle message - save OperationalEnvironment Complete Dao");
315         opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
316         saveOpEnvEntry(errorWrapper, opEnvEntry);
317
318     }
319
320     void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
321         log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
322         log.invoke(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), EnvironmentsEngine.class.getName(), errorWrapper.toString() );
323         try {
324             boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
325             if (isKeyFieldsValid) {
326                 String opEnvKey = map2OpEnvKey(opEnvEntry);
327                 List<String> uebHosts = discoverUebHosts(opEnvKey);
328                 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
329                 log.invokeReturn(opEnvEntry.getEnvironmentId(), "retrieveUebAddressesFromAftDme", opEnvEntry.getStatus(), "SDC-BE", errorWrapper.toString() );
330             } else {
331                 errorWrapper.setInnerElement(false);
332                 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
333             }
334
335         } catch (Exception e) {
336             errorWrapper.setInnerElement(false);
337             log.error("Failed to retrieve Ueb Addresses From DME. ", e);
338         }
339     }
340
341     void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
342         log.debug("handle message - Create UEB keys");
343         List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
344                 .collect(Collectors.toList());
345         Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
346                 .createUebKeys(discoverEndPoints);
347         if (eitherCreateUebKeys.isRight()) {
348             errorWrapper.setInnerElement(false);
349             log.debug("handle message - failed to create UEB Keys");
350         } else {
351             ApiCredential apiCredential = eitherCreateUebKeys.left().value();
352             opEnvEntry.setUebApikey(apiCredential.getApiKey());
353             opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
354         }
355     }
356
357     void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
358         log.debug("handle message - Get Env Info From A&AI");
359         Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
360                 opEnvEntry.getEnvironmentId());
361         if (eitherOperationalEnvInfo.isRight()) {
362             errorWrapper.setInnerElement(false);
363             log.debug("handle message - failed to retrieve details from A&AI");
364         } else {
365             OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
366             opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
367             opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
368         }
369     }
370
371     void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
372         log.debug("handle message - save OperationalEnvironment In-Progress Dao");
373         OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
374         // Entry Environment ID holds actually the environment NAME
375         opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
376         opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
377         opEnvEntry.setIsProduction(false);
378         saveOpEnvEntry(errorWrapper, opEnvEntry);
379         opEnvEntryWrapper.setInnerElement(opEnvEntry);
380
381     }
382
383
384     void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
385         log.debug("handle message - verify OperationalEnvironment not In-Progress");
386         String opEnvId = notificationData.getOperationalEnvironmentId();
387
388         Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
389                 .get(opEnvId);
390         if (eitherOpEnv.isLeft()) {
391             final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
392             if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
393                 errorWrapper.setInnerElement(false);
394                 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
395             }
396         } else {
397             CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
398             if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
399                 errorWrapper.setInnerElement(false);
400                 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
401                         operationStatus);
402             }
403         }
404
405     }
406
407     void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
408         log.debug("handle message - verify Action Type");
409         DmaapActionEnum action = notificationData.getAction();
410         if (action == DmaapActionEnum.DELETE) {
411             errorWrapper.setInnerElement(false);
412             log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
413         }
414     }
415
416     void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
417                                  IDmaapAuditNotificationData auditNotificationData) {
418         log.debug("handle message - verify OperationaEnvironmentType");
419         OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
420         if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
421             errorWrapper.setInnerElement(false);
422             log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
423             componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
424                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
425                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
426                     auditNotificationData.getTenantContext());
427         }
428     }
429
430
431     private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
432         entry.setLastModified(new Date(System.currentTimeMillis()));
433         CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
434         if (saveStaus != CassandraOperationStatus.OK) {
435             errorWrapper.setInnerElement(false);
436             log.debug("handle message saving  operational environmet failed for id :{} with error : {}",
437                     entry.getEnvironmentId(), saveStaus);
438         }
439     }
440
441     public List<String> discoverUebHosts(String opEnvKey) throws DME2Exception {
442         String lookupUriFormat = configurationManager.getConfiguration().getDmeConfiguration().getLookupUriFormat();
443         String environment = configurationManager.getConfiguration().getDmaapConsumerConfiguration().getEnvironment();
444         String lookupURI = String.format(lookupUriFormat, opEnvKey, environment);
445         log.debug("DME2 GRM URI: {}", lookupURI);
446
447         List<String> uebHosts = new LinkedList<>();
448         DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
449         // Beginning iteration
450         while (iterator.hasNext()) {
451             DME2EndpointReference ref = iterator.next();
452             DME2Endpoint dmeEndpoint = ref.getEndpoint();
453             log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
454             uebHosts.add(dmeEndpoint.getHost());
455         }
456
457         return uebHosts;
458     }
459
460     private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
461         return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
462     }
463
464     private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
465         Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
466         OperationalEnvironmentEntry confEntry = readEnvFromConfig();
467         envs.put(confEntry.getEnvironmentId(), confEntry);
468         return envs;
469     }
470
471     private OperationalEnvironmentEntry readEnvFromConfig() {
472         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
473         DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
474                 .getDistributionEngineConfiguration();
475         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
476         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
477
478         Set<String> puebEndpoints = new HashSet<>();
479         puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
480         entry.setDmaapUebAddress(puebEndpoints);
481
482         String envName = distributionEngineConfiguration.getEnvironments().size() == 1
483                 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
484         entry.setEnvironmentId(envName);
485         entry.setIsProduction(true);
486
487         if (log.isDebugEnabled()) {
488             log.debug("Enviroment read from configuration: {}", entry);
489         }
490
491         return entry;
492     }
493
494     private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
495         Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
496                 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
497
498         if (opEnvResult.isLeft()) {
499             Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
500                     .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
501             resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
502             return resultMap;
503         } else {
504             CassandraOperationStatus status = opEnvResult.right().value();
505             log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
506             return new HashMap<>();
507         }
508     }
509
510     void createUebTopicsForEnvironments() {
511         environments.values().stream()
512                 .filter(not(OperationalEnvironmentEntry::getIsProduction))
513                 .forEach(this::createUebTopicsForEnvironment);
514     }
515
516     public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
517         String envId = opEnvEntry.getEnvironmentId();
518         log.debug("Create Environment {} on UEB Topic.", envId);
519         AtomicBoolean status = new AtomicBoolean(false);
520         envNamePerStatus.put(envId, status);
521
522         connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
523     }
524
525     @VisibleForTesting
526     void setConfigurationManager(ConfigurationManager configurationManager) {
527         this.configurationManager = configurationManager;
528     }
529
530     public Map<String, OperationalEnvironmentEntry> getEnvironments() {
531         return environments;
532     }
533
534     public OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
535         return environments.values().stream()
536                 .filter(e -> e.getDmaapUebAddress().stream()
537                     .filter(dmaapUebAddress::contains).findAny().isPresent())
538                 .findFirst()
539                 .orElseThrow(() -> new ByActionStatusComponentException(ActionStatus.DISTRIBUTION_ENV_DOES_NOT_EXIST,dmaapUebAddress.toString()));
540     }
541
542
543
544     public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
545         HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
546         if (resp.getStatusCode() == HttpStatus.SC_OK) {
547             try {
548                 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
549
550                 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
551                 return Either.left(operationalEnvInfo);
552             } catch (Exception e) {
553                 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
554                 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
555             }
556         } else {
557             log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
558                     resp.getStatusCode(), resp.getDescription());
559             return Either.right(resp.getStatusCode());
560         }
561     }
562
563     public OperationalEnvironmentEntry getEnvironmentById(String envId) {
564         return environments.get(envId);
565     }
566
567     public boolean isInMap(OperationalEnvironmentEntry env) {
568         return isInMap(env.getEnvironmentId());
569     }
570
571     public boolean isInMap(String envId) {
572         return environments.containsKey(envId);
573     }
574
575     public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
576         environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
577
578     }
579 }