9a875e5d48092422e86e24663bd42de788130cc4
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / EnvironmentsEngine.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import com.att.aft.dme2.api.DME2Exception;
4 import com.att.aft.dme2.iterator.DME2EndpointIterator;
5 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
6 import com.att.aft.dme2.manager.registry.DME2Endpoint;
7 import com.att.nsa.apiClient.credentials.ApiCredential;
8 import com.google.common.annotations.VisibleForTesting;
9 import com.google.gson.Gson;
10 import com.google.gson.GsonBuilder;
11 import fj.data.Either;
12 import org.apache.commons.lang3.StringUtils;
13 import org.apache.http.HttpStatus;
14 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
15 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
16 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
17 import org.openecomp.sdc.be.config.ConfigurationManager;
18 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
19 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
20 import org.openecomp.sdc.be.config.DmeConfiguration;
21 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
22 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
23 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
24 import org.openecomp.sdc.be.impl.ComponentsUtils;
25 import org.openecomp.sdc.be.info.OperationalEnvInfo;
26 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
27 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
28 import org.openecomp.sdc.common.datastructure.Wrapper;
29 import org.openecomp.sdc.common.http.client.api.HttpResponse;
30 import org.openecomp.sdc.common.log.wrappers.Logger;
31 import org.springframework.stereotype.Service;
32
33 import javax.annotation.PostConstruct;
34 import java.util.*;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.function.Function;
37 import java.util.function.Supplier;
38 import java.util.stream.Collectors;
39
40 import static org.apache.commons.lang3.StringUtils.isEmpty;
41 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
42
43 /**
44  * Allows to consume DMAAP topic and handle received notifications
45  */
46 @Service
47 public class EnvironmentsEngine implements INotificationHandler {
48
49     private static final String MESSAGE_BUS = "MessageBus";
50     private static final String UNKNOWN = "Unknown";
51     private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
52     private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
53
54     private Map<String, OperationalEnvironmentEntry> environments = new HashMap<>();
55     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
56     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
57     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
58
59     private final DmaapConsumer dmaapConsumer;
60     private final OperationalEnvironmentDao operationalEnvironmentDao;
61     private final DME2EndpointIteratorCreator epIterCreator;
62     private final AaiRequestHandler aaiRequestHandler;
63     private final ComponentsUtils componentUtils;
64     private final CambriaHandler cambriaHandler;
65     private final DistributionEngineClusterHealth distributionEngineClusterHealth;
66     private final DistributionCompleteReporter distributionCompleteReporter;
67
68     public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
69         this.dmaapConsumer = dmaapConsumer;
70         this.operationalEnvironmentDao = operationalEnvironmentDao;
71         this.epIterCreator = epIterCreator;
72         this.aaiRequestHandler = aaiRequestHandler;
73         this.componentUtils = componentUtils;
74         this.cambriaHandler = cambriaHandler;
75         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
76         this.distributionCompleteReporter = distributionCompleteReporter;
77     }
78
79     @VisibleForTesting
80     @PostConstruct
81     void init() {
82         try {
83             environments = populateEnvironments();
84             createUebTopicsForEnvironments();
85             initDmeGlobalConfig();
86             if(!configurationManager.getConfiguration().getDmaapConsumerConfiguration().isActive()){
87                 log.info("Environments engine is disabled");
88                 return;
89             }
90             dmaapConsumer.consumeDmaapTopic(this::handleMessage,
91                 (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
92             log.info("Environments engine has been initialized.");
93         } catch (Exception e) {
94             log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
95         }
96     }
97
98     private void initDmeGlobalConfig() {
99         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
100         if (dmaapConsumerParams == null) {
101             log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
102             return;
103         }
104         System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getEnvironment()); // AFTPRD for production
105         System.setProperty("AFT_LATITUDE", dmaapConsumerParams.getLatitude()!=null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
106         System.setProperty("AFT_LONGITUDE", dmaapConsumerParams.getLongitude()!=null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
107     }
108
109     public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
110                                                AtomicBoolean status,
111                                                Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
112         connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
113
114     }
115
116     public void connectUebTopicForDistributionConfTopic(String envName,
117                                                         AtomicBoolean status,
118                                                         Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
119         connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
120
121     }
122
123     /**
124      * Allows to create and run UEB initializing and polling tasks
125      *
126      * @param status
127      * @param envNamePerInitTask
128      * @param envNamePerPollingTask
129      * @param opEnvEntry
130      */
131     private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
132                                  Map<String, DistributionEngineInitTask> envNamePerInitTask,
133                                  Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
134
135         String envId = opEnvEntry.getEnvironmentId();
136
137         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
138                 .getDistributionEngineConfiguration();
139         DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
140                 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
141                 opEnvEntry);
142         String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
143         DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
144                 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
145                 opEnvEntry);
146         distributionEngineInitTask.startTask();
147         envNamePerInitTask.put(envId, distributionEngineInitTask);
148         envNamePerPollingTask.put(envId, distributionEnginePollingTask);
149
150         log.debug("Environment envId = {} has been connected to the UEB topic", envId);
151     }
152
153     @Override
154     public boolean handleMessage(String notification) {
155         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
156                 .getConfiguration().getDmaapConsumerConfiguration();
157         Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
158         Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
159                 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
160
161         boolean result;
162         if (eitherTimeOut.isRight()) {
163             result = false;
164         } else {
165             result = eitherTimeOut.left().value();
166         }
167         return result;
168     }
169
170     public boolean handleMessageLogic(String notification) {
171         Wrapper<Boolean> errorWrapper = new Wrapper<>();
172         Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
173         try {
174
175             log.debug("handle message - for operational environment notification received: {}", notification);
176             Gson gsonObj = new GsonBuilder().create();
177
178             IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
179                     DmaapNotificationDataImpl.class);
180             IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
181                     DmaapNotificationDataImpl.class);
182
183             AuditingActionEnum actionEnum;
184             switch (notificationData.getAction()) {
185                 case CREATE:
186                     actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
187                     break;
188                 case UPDATE:
189                     actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
190                     break;
191                 case DELETE:
192                     actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
193                     break;
194                 default:
195                     actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
196                     break;
197             }
198             componentUtils.auditEnvironmentEngine(actionEnum,
199                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
200                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
201                     auditNotificationData.getTenantContext());
202
203             if (errorWrapper.isEmpty()) {
204                 validateNotification(errorWrapper, notificationData, auditNotificationData);
205             }
206             // Perform Save In-Progress Dao
207             if (errorWrapper.isEmpty()) {
208                 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
209             }
210
211             if (errorWrapper.isEmpty()) {
212                 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
213             }
214
215         } catch (Exception e) {
216             log.debug("handle message for operational environmet failed for notification: {} with error :{}",
217                     notification, e.getMessage(), e);
218             errorWrapper.setInnerElement(false);
219
220         }
221         return errorWrapper.isEmpty();
222     }
223
224     private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
225                                       IDmaapAuditNotificationData auditNotificationData) {
226         // Check OperationaEnvironmentType
227         if (errorWrapper.isEmpty()) {
228             validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
229         }
230         // Check Action Type
231         if (errorWrapper.isEmpty()) {
232             validateActionType(errorWrapper, notificationData);
233         }
234         // Check is valid for create/update (not In-Progress state)
235         if (errorWrapper.isEmpty()) {
236             validateState(errorWrapper, notificationData);
237         }
238     }
239
240     public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
241         // Get Env Info From A&AI
242         if (errorWrapper.isEmpty()) {
243             retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
244         }
245
246         if (errorWrapper.isEmpty()) {
247             // Get List Of UEB Addresses From AFT_DME
248             retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
249         }
250
251         // Create UEB keys and set them on EnvEntry
252         if (errorWrapper.isEmpty()) {
253             createUebKeys(errorWrapper, opEnvEntry);
254         }
255
256         // Create Topics
257         if (errorWrapper.isEmpty()) {
258             log.debug("handle message - Create Topics");
259             createUebTopicsForEnvironment(opEnvEntry);
260         }
261
262         // Save Status Complete and Add to Map
263         if (errorWrapper.isEmpty()) {
264             saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
265         }
266
267         // Update Environments Map
268         if (errorWrapper.isEmpty()) {
269             environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
270         } else {
271             saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
272         }
273     }
274
275     private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
276         log.debug("handle message - save OperationalEnvironment Failed Status");
277         opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
278         saveOpEnvEntry(errorWrapper, opEnvEntry);
279     }
280
281     void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
282         log.debug("handle message - save OperationalEnvironment Complete Dao");
283         opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
284         saveOpEnvEntry(errorWrapper, opEnvEntry);
285
286     }
287
288     void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
289         log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
290         try {
291             boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
292             if (isKeyFieldsValid) {
293                 String opEnvKey = map2OpEnvKey(opEnvEntry);
294                 String environmentId = opEnvEntry.getEnvironmentId();
295                 List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
296                 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
297             } else {
298                 errorWrapper.setInnerElement(false);
299                 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
300             }
301
302         } catch (DME2Exception e) {
303             errorWrapper.setInnerElement(false);
304             log.error("Failed to retrieve Ueb Addresses From DME. ", e);
305         }
306     }
307
308     void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
309         log.debug("handle message - Create UEB keys");
310         List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
311                 .collect(Collectors.toList());
312         Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
313                 .createUebKeys(discoverEndPoints);
314         if (eitherCreateUebKeys.isRight()) {
315             errorWrapper.setInnerElement(false);
316             log.debug("handle message - failed to create UEB Keys");
317         } else {
318             ApiCredential apiCredential = eitherCreateUebKeys.left().value();
319             opEnvEntry.setUebApikey(apiCredential.getApiKey());
320             opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
321         }
322     }
323
324     void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
325         log.debug("handle message - Get Env Info From A&AI");
326         Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
327                 opEnvEntry.getEnvironmentId());
328         if (eitherOperationalEnvInfo.isRight()) {
329             errorWrapper.setInnerElement(false);
330             log.debug("handle message - failed to retrieve details from A&AI");
331         } else {
332             OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
333             opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
334             opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
335         }
336     }
337
338     void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
339         log.debug("handle message - save OperationalEnvironment In-Progress Dao");
340         OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
341         // Entry Environment ID holds actually the environment NAME
342         opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
343         opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
344         opEnvEntry.setIsProduction(false);
345         saveOpEnvEntry(errorWrapper, opEnvEntry);
346         opEnvEntryWrapper.setInnerElement(opEnvEntry);
347
348     }
349
350
351     void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
352         log.debug("handle message - verify OperationalEnvironment not In-Progress");
353         String opEnvId = notificationData.getOperationalEnvironmentId();
354
355         Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
356                 .get(opEnvId);
357         if (eitherOpEnv.isLeft()) {
358             final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
359             if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
360                 errorWrapper.setInnerElement(false);
361                 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
362             }
363         } else {
364             CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
365             if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
366                 errorWrapper.setInnerElement(false);
367                 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
368                         operationStatus);
369             }
370         }
371
372     }
373
374     void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
375         log.debug("handle message - verify Action Type");
376         DmaapActionEnum action = notificationData.getAction();
377         if (action == DmaapActionEnum.DELETE) {
378             errorWrapper.setInnerElement(false);
379             log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
380         }
381     }
382
383     void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
384                                  IDmaapAuditNotificationData auditNotificationData) {
385         log.debug("handle message - verify OperationaEnvironmentType");
386         OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
387         if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
388             errorWrapper.setInnerElement(false);
389             log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
390             componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
391                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
392                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
393                     auditNotificationData.getTenantContext());
394         }
395     }
396
397
398     private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
399         entry.setLastModified(new Date(System.currentTimeMillis()));
400         CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
401         if (saveStaus != CassandraOperationStatus.OK) {
402             errorWrapper.setInnerElement(false);
403             log.debug("handle message saving  operational environmet failed for id :{} with error : {}",
404                     entry.getEnvironmentId(), saveStaus);
405         }
406     }
407
408     public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception {
409         DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration();
410         List<String> uebHosts = new LinkedList<>();
411
412         String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey,
413                 env);
414         DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
415
416         // Beginning iteration
417         while (iterator.hasNext()) {
418             DME2EndpointReference ref = iterator.next();
419             DME2Endpoint dmeEndpoint = ref.getEndpoint();
420             log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
421             uebHosts.add(dmeEndpoint.getHost());
422         }
423
424         return uebHosts;
425     }
426
427     private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
428         return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
429     }
430
431     private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
432         Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
433         OperationalEnvironmentEntry confEntry = readEnvFromConfig();
434         envs.put(confEntry.getEnvironmentId(), confEntry);
435         return envs;
436     }
437
438     private OperationalEnvironmentEntry readEnvFromConfig() {
439         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
440         DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
441                 .getDistributionEngineConfiguration();
442         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
443         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
444
445         Set<String> puebEndpoints = new HashSet<>();
446         puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
447         entry.setDmaapUebAddress(puebEndpoints);
448
449         String envName = distributionEngineConfiguration.getEnvironments().size() == 1
450                 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
451         entry.setEnvironmentId(envName);
452
453         if (log.isDebugEnabled()) {
454             log.debug("Enviroment read from configuration: {}", entry);
455         }
456
457         return entry;
458     }
459
460     private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
461         Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
462                 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
463
464         if (opEnvResult.isLeft()) {
465             Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
466                     .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
467             resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
468             return resultMap;
469         } else {
470             CassandraOperationStatus status = opEnvResult.right().value();
471             log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
472             return new HashMap<>();
473         }
474     }
475
476     void createUebTopicsForEnvironments() {
477         environments.values().forEach(this::createUebTopicsForEnvironment);
478     }
479
480     public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
481         String envId = opEnvEntry.getEnvironmentId();
482         log.debug("Create Environment {} on UEB Topic.", envId);
483         AtomicBoolean status = new AtomicBoolean(false);
484         envNamePerStatus.put(envId, status);
485
486         connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
487     }
488
489     @VisibleForTesting
490     void setConfigurationManager(ConfigurationManager configurationManager) {
491         this.configurationManager = configurationManager;
492     }
493
494     public Map<String, OperationalEnvironmentEntry> getEnvironments() {
495         return environments;
496     }
497
498
499     public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
500         HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
501         if (resp.getStatusCode() == HttpStatus.SC_OK) {
502             try {
503                 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
504
505                 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
506                 return Either.left(operationalEnvInfo);
507             } catch (Exception e) {
508                 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
509                 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
510             }
511         } else {
512             log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
513                     resp.getStatusCode(), resp.getDescription());
514             return Either.right(resp.getStatusCode());
515         }
516     }
517
518     public OperationalEnvironmentEntry getEnvironmentById(String envId) {
519         return environments.get(envId);
520     }
521
522     public boolean isInMap(OperationalEnvironmentEntry env) {
523         return isInMap(env.getEnvironmentId());
524     }
525
526     public boolean isInMap(String envId) {
527         return environments.containsKey(envId);
528     }
529
530     public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
531         environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
532
533     }
534 }