re base code
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / EnvironmentsEngine.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import com.att.aft.dme2.api.DME2Exception;
4 import com.att.aft.dme2.iterator.DME2EndpointIterator;
5 import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
6 import com.att.aft.dme2.manager.registry.DME2Endpoint;
7 import com.att.nsa.apiClient.credentials.ApiCredential;
8 import com.google.common.annotations.VisibleForTesting;
9 import com.google.gson.Gson;
10 import com.google.gson.GsonBuilder;
11 import fj.data.Either;
12 import org.apache.commons.lang3.StringUtils;
13 import org.apache.http.HttpStatus;
14 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
15 import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.OperationaEnvironmentTypeEnum;
16 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
17 import org.openecomp.sdc.be.config.ConfigurationManager;
18 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
19 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
20 import org.openecomp.sdc.be.config.DmeConfiguration;
21 import org.openecomp.sdc.be.dao.cassandra.CassandraOperationStatus;
22 import org.openecomp.sdc.be.dao.cassandra.OperationalEnvironmentDao;
23 import org.openecomp.sdc.be.datatypes.enums.EnvironmentStatusEnum;
24 import org.openecomp.sdc.be.impl.ComponentsUtils;
25 import org.openecomp.sdc.be.info.OperationalEnvInfo;
26 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
27 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
28 import org.openecomp.sdc.common.datastructure.Wrapper;
29 import org.openecomp.sdc.common.http.client.api.HttpResponse;
30 import org.openecomp.sdc.common.log.wrappers.Logger;
31 import org.springframework.stereotype.Service;
32
33 import javax.annotation.PostConstruct;
34 import java.util.*;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.function.Function;
37 import java.util.function.Supplier;
38 import java.util.stream.Collectors;
39
40 import static org.apache.commons.lang3.StringUtils.isEmpty;
41 import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
42
43 /**
44  * Allows to consume DMAAP topic and handle received notifications
45  */
46 @Service
47 public class EnvironmentsEngine implements INotificationHandler {
48
49     private static final String MESSAGE_BUS = "MessageBus";
50     private static final String UNKNOWN = "Unknown";
51     private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
52     private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
53
54     private Map<String, OperationalEnvironmentEntry> environments;
55     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
56     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
57     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
58
59     private final DmaapConsumer dmaapConsumer;
60     private final OperationalEnvironmentDao operationalEnvironmentDao;
61     private final DME2EndpointIteratorCreator epIterCreator;
62     private final AaiRequestHandler aaiRequestHandler;
63     private final ComponentsUtils componentUtils;
64     private final CambriaHandler cambriaHandler;
65     private final DistributionEngineClusterHealth distributionEngineClusterHealth;
66     private final DistributionCompleteReporter distributionCompleteReporter;
67
68     public EnvironmentsEngine(DmaapConsumer dmaapConsumer, OperationalEnvironmentDao operationalEnvironmentDao, DME2EndpointIteratorCreator epIterCreator, AaiRequestHandler aaiRequestHandler, ComponentsUtils componentUtils, CambriaHandler cambriaHandler, DistributionEngineClusterHealth distributionEngineClusterHealth, DistributionCompleteReporter distributionCompleteReporter) {
69         this.dmaapConsumer = dmaapConsumer;
70         this.operationalEnvironmentDao = operationalEnvironmentDao;
71         this.epIterCreator = epIterCreator;
72         this.aaiRequestHandler = aaiRequestHandler;
73         this.componentUtils = componentUtils;
74         this.cambriaHandler = cambriaHandler;
75         this.distributionEngineClusterHealth = distributionEngineClusterHealth;
76         this.distributionCompleteReporter = distributionCompleteReporter;
77     }
78
79     @VisibleForTesting
80     @PostConstruct
81     void init() {
82         log.trace("Environments engine has been initialized. ");
83         try {
84             environments = populateEnvironments();
85             createUebTopicsForEnvironments();
86             initDmeGlobalConfig();
87             dmaapConsumer.consumeDmaapTopic(this::handleMessage,
88                     (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
89         } catch (Exception e) {
90             log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
91         }
92     }
93
94     private void initDmeGlobalConfig() {
95         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
96         if (dmaapConsumerParams == null) {
97             log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
98             return;
99         }
100         System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getEnvironment()); // AFTPRD for production
101         System.setProperty("AFT_LATITUDE", dmaapConsumerParams.getLatitude()!=null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
102         System.setProperty("AFT_LONGITUDE", dmaapConsumerParams.getLongitude()!=null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
103     }
104
105     public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
106                                                AtomicBoolean status,
107                                                Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
108         connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
109
110     }
111
112     public void connectUebTopicForDistributionConfTopic(String envName,
113                                                         AtomicBoolean status,
114                                                         Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
115         connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
116
117     }
118
119     /**
120      * Allows to create and run UEB initializing and polling tasks
121      *
122      * @param status
123      * @param envNamePerInitTask
124      * @param envNamePerPollingTask
125      * @param opEnvEntry
126      */
127     private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
128                                  Map<String, DistributionEngineInitTask> envNamePerInitTask,
129                                  Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
130
131         String envId = opEnvEntry.getEnvironmentId();
132
133         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
134                 .getDistributionEngineConfiguration();
135         DistributionEnginePollingTask distributionEnginePollingTask = new DistributionEnginePollingTask(
136                 distributionEngineConfiguration, distributionCompleteReporter, componentUtils, distributionEngineClusterHealth,
137                 opEnvEntry);
138         String envName = configurationManager.getDistributionEngineConfiguration().getEnvironments().get(0);
139         DistributionEngineInitTask distributionEngineInitTask = new DistributionEngineInitTask(0l,
140                 distributionEngineConfiguration, envName, status, componentUtils, distributionEnginePollingTask,
141                 opEnvEntry);
142         distributionEngineInitTask.startTask();
143         envNamePerInitTask.put(envId, distributionEngineInitTask);
144         envNamePerPollingTask.put(envId, distributionEnginePollingTask);
145
146         log.debug("Environment envId = {} has been connected to the UEB topic", envId);
147     }
148
149     @Override
150     public boolean handleMessage(String notification) {
151         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager()
152                 .getConfiguration().getDmaapConsumerConfiguration();
153         Supplier<Boolean> supplier = () -> handleMessageLogic(notification);
154         Either<Boolean, Boolean> eitherTimeOut = runMethodWithTimeOut(supplier,
155                 dmaapConsumerParams.getTimeLimitForNotificationHandleMs());
156
157         boolean result;
158         if (eitherTimeOut.isRight()) {
159             result = false;
160         } else {
161             result = eitherTimeOut.left().value();
162         }
163         return result;
164     }
165
166     public boolean handleMessageLogic(String notification) {
167         Wrapper<Boolean> errorWrapper = new Wrapper<>();
168         Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
169         try {
170
171             log.debug("handle message - for operational environment notification received: {}", notification);
172             Gson gsonObj = new GsonBuilder().create();
173
174             IDmaapNotificationData notificationData = gsonObj.fromJson(notification,
175                     DmaapNotificationDataImpl.class);
176             IDmaapAuditNotificationData auditNotificationData = gsonObj.fromJson(notification,
177                     DmaapNotificationDataImpl.class);
178
179             AuditingActionEnum actionEnum;
180             switch (notificationData.getAction()) {
181                 case CREATE:
182                     actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
183                     break;
184                 case UPDATE:
185                     actionEnum = AuditingActionEnum.UPDATE_ENVIRONMENT;
186                     break;
187                 case DELETE:
188                     actionEnum = AuditingActionEnum.DELETE_ENVIRONMENT;
189                     break;
190                 default:
191                     actionEnum = AuditingActionEnum.UNKNOWN_ENVIRONMENT_NOTIFICATION;
192                     break;
193             }
194             componentUtils.auditEnvironmentEngine(actionEnum,
195                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
196                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
197                     auditNotificationData.getTenantContext());
198
199             if (errorWrapper.isEmpty()) {
200                 validateNotification(errorWrapper, notificationData, auditNotificationData);
201             }
202             // Perform Save In-Progress Dao
203             if (errorWrapper.isEmpty()) {
204                 saveEntryWithInProgressStatus(errorWrapper, opEnvEntryWrapper, notificationData);
205             }
206
207             if (errorWrapper.isEmpty()) {
208                 buildOpEnv(errorWrapper, opEnvEntryWrapper.getInnerElement());
209             }
210
211         } catch (Exception e) {
212             log.debug("handle message for operational environmet failed for notification: {} with error :{}",
213                     notification, e.getMessage(), e);
214             errorWrapper.setInnerElement(false);
215
216         }
217         return errorWrapper.isEmpty();
218     }
219
220     private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
221                                       IDmaapAuditNotificationData auditNotificationData) {
222         // Check OperationaEnvironmentType
223         if (errorWrapper.isEmpty()) {
224             validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
225         }
226         // Check Action Type
227         if (errorWrapper.isEmpty()) {
228             validateActionType(errorWrapper, notificationData);
229         }
230         // Check is valid for create/update (not In-Progress state)
231         if (errorWrapper.isEmpty()) {
232             validateState(errorWrapper, notificationData);
233         }
234     }
235
236     public void buildOpEnv(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
237         // Get Env Info From A&AI
238         if (errorWrapper.isEmpty()) {
239             retrieveOpEnvInfoFromAAI(errorWrapper, opEnvEntry);
240         }
241
242         if (errorWrapper.isEmpty()) {
243             // Get List Of UEB Addresses From AFT_DME
244             retrieveUebAddressesFromAftDme(errorWrapper, opEnvEntry);
245         }
246
247         // Create UEB keys and set them on EnvEntry
248         if (errorWrapper.isEmpty()) {
249             createUebKeys(errorWrapper, opEnvEntry);
250         }
251
252         // Create Topics
253         if (errorWrapper.isEmpty()) {
254             log.debug("handle message - Create Topics");
255             createUebTopicsForEnvironment(opEnvEntry);
256         }
257
258         // Save Status Complete and Add to Map
259         if (errorWrapper.isEmpty()) {
260             saveEntryWithCompleteStatus(errorWrapper, opEnvEntry);
261         }
262
263         // Update Environments Map
264         if (errorWrapper.isEmpty()) {
265             environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
266         } else {
267             saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
268         }
269     }
270
271     private void saveEntryWithFailedStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
272         log.debug("handle message - save OperationalEnvironment Failed Status");
273         opEnvEntry.setStatus(EnvironmentStatusEnum.FAILED);
274         saveOpEnvEntry(errorWrapper, opEnvEntry);
275     }
276
277     void saveEntryWithCompleteStatus(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
278         log.debug("handle message - save OperationalEnvironment Complete Dao");
279         opEnvEntry.setStatus(EnvironmentStatusEnum.COMPLETED);
280         saveOpEnvEntry(errorWrapper, opEnvEntry);
281
282     }
283
284     void retrieveUebAddressesFromAftDme(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
285         log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
286         try {
287             boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
288             if (isKeyFieldsValid) {
289                 String opEnvKey = map2OpEnvKey(opEnvEntry);
290                 String environmentId = opEnvEntry.getEnvironmentId();
291                 List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
292                 opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
293             } else {
294                 errorWrapper.setInnerElement(false);
295                 log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
296             }
297
298         } catch (DME2Exception e) {
299             errorWrapper.setInnerElement(false);
300             log.error("Failed to retrieve Ueb Addresses From DME. ", e);
301         }
302     }
303
304     void createUebKeys(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
305         log.debug("handle message - Create UEB keys");
306         List<String> discoverEndPoints = opEnvEntry.getDmaapUebAddress().stream()
307                 .collect(Collectors.toList());
308         Either<ApiCredential, CambriaErrorResponse> eitherCreateUebKeys = cambriaHandler
309                 .createUebKeys(discoverEndPoints);
310         if (eitherCreateUebKeys.isRight()) {
311             errorWrapper.setInnerElement(false);
312             log.debug("handle message - failed to create UEB Keys");
313         } else {
314             ApiCredential apiCredential = eitherCreateUebKeys.left().value();
315             opEnvEntry.setUebApikey(apiCredential.getApiKey());
316             opEnvEntry.setUebSecretKey(apiCredential.getApiSecret());
317         }
318     }
319
320     void retrieveOpEnvInfoFromAAI(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry opEnvEntry) {
321         log.debug("handle message - Get Env Info From A&AI");
322         Either<OperationalEnvInfo, Integer> eitherOperationalEnvInfo = getOperationalEnvById(
323                 opEnvEntry.getEnvironmentId());
324         if (eitherOperationalEnvInfo.isRight()) {
325             errorWrapper.setInnerElement(false);
326             log.debug("handle message - failed to retrieve details from A&AI");
327         } else {
328             OperationalEnvInfo operationalEnvInfo = eitherOperationalEnvInfo.left().value();
329             opEnvEntry.setEcompWorkloadContext(operationalEnvInfo.getWorkloadContext());
330             opEnvEntry.setTenant(operationalEnvInfo.getTenantContext());
331         }
332     }
333
334     void saveEntryWithInProgressStatus(Wrapper<Boolean> errorWrapper, Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper, IDmaapNotificationData notificationData) {
335         log.debug("handle message - save OperationalEnvironment In-Progress Dao");
336         OperationalEnvironmentEntry opEnvEntry = new OperationalEnvironmentEntry();
337         // Entry Environment ID holds actually the environment NAME
338         opEnvEntry.setEnvironmentId(notificationData.getOperationalEnvironmentId());
339         opEnvEntry.setStatus(EnvironmentStatusEnum.IN_PROGRESS);
340         opEnvEntry.setIsProduction(false);
341         saveOpEnvEntry(errorWrapper, opEnvEntry);
342         opEnvEntryWrapper.setInnerElement(opEnvEntry);
343
344     }
345
346
347     void validateState(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
348         log.debug("handle message - verify OperationalEnvironment not In-Progress");
349         String opEnvId = notificationData.getOperationalEnvironmentId();
350
351         Either<OperationalEnvironmentEntry, CassandraOperationStatus> eitherOpEnv = operationalEnvironmentDao
352                 .get(opEnvId);
353         if (eitherOpEnv.isLeft()) {
354             final OperationalEnvironmentEntry opEnvEntry = eitherOpEnv.left().value();
355             if (StringUtils.equals(opEnvEntry.getStatus(), EnvironmentStatusEnum.IN_PROGRESS.getName())) {
356                 errorWrapper.setInnerElement(false);
357                 log.debug("handle message - validate State Failed Record Found With Status : {} Flow Stopped!", opEnvEntry.getStatus());
358             }
359         } else {
360             CassandraOperationStatus operationStatus = eitherOpEnv.right().value();
361             if (operationStatus != CassandraOperationStatus.NOT_FOUND) {
362                 errorWrapper.setInnerElement(false);
363                 log.debug("failed to retrieve operationa environment with id:{} cassandra error was :{}", opEnvId,
364                         operationStatus);
365             }
366         }
367
368     }
369
370     void validateActionType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData) {
371         log.debug("handle message - verify Action Type");
372         DmaapActionEnum action = notificationData.getAction();
373         if (action == DmaapActionEnum.DELETE) {
374             errorWrapper.setInnerElement(false);
375             log.debug("handle message - validate Action Type Failed With Action Type: {} Flow Stopped!", action);
376         }
377     }
378
379     void validateEnvironmentType(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
380                                  IDmaapAuditNotificationData auditNotificationData) {
381         log.debug("handle message - verify OperationaEnvironmentType");
382         OperationaEnvironmentTypeEnum envType = notificationData.getOperationalEnvironmentType();
383         if (envType != OperationaEnvironmentTypeEnum.ECOMP) {
384             errorWrapper.setInnerElement(false);
385             log.debug("handle message - validate Environment Type Failed With Environment Type: {} Flow Stopped!", envType);
386             componentUtils.auditEnvironmentEngine(AuditingActionEnum.UNSUPPORTED_ENVIRONMENT_TYPE,
387                     notificationData.getOperationalEnvironmentId(), notificationData.getOperationalEnvironmentType().getEventTypenName(),
388                     notificationData.getAction().getActionName(), auditNotificationData.getOperationalEnvironmentName(),
389                     auditNotificationData.getTenantContext());
390         }
391     }
392
393
394     private void saveOpEnvEntry(Wrapper<Boolean> errorWrapper, OperationalEnvironmentEntry entry) {
395         entry.setLastModified(new Date(System.currentTimeMillis()));
396         CassandraOperationStatus saveStaus = operationalEnvironmentDao.save(entry);
397         if (saveStaus != CassandraOperationStatus.OK) {
398             errorWrapper.setInnerElement(false);
399             log.debug("handle message saving  operational environmet failed for id :{} with error : {}",
400                     entry.getEnvironmentId(), saveStaus);
401         }
402     }
403
404     public List<String> discoverUebHosts(String opEnvKey, String env) throws DME2Exception {
405         DmeConfiguration dmeConfiguration = configurationManager.getConfiguration().getDmeConfiguration();
406         List<String> uebHosts = new LinkedList<>();
407
408         String lookupURI = String.format("http://%s/service=%s/version=1.0.0/envContext=%s/partner=*", dmeConfiguration.getDme2Search(), opEnvKey,
409                 env);
410         DME2EndpointIterator iterator = epIterCreator.create(lookupURI);
411
412         // Beginning iteration
413         while (iterator.hasNext()) {
414             DME2EndpointReference ref = iterator.next();
415             DME2Endpoint dmeEndpoint = ref.getEndpoint();
416             log.debug("DME returns EP with UEB host {}, UEB port: {}", dmeEndpoint.getHost(), dmeEndpoint.getPort());
417             uebHosts.add(dmeEndpoint.getHost());
418         }
419
420         return uebHosts;
421     }
422
423     private String map2OpEnvKey(OperationalEnvironmentEntry entry) {
424         return String.format("%s.%s.%s", entry.getTenant(), entry.getEcompWorkloadContext(), MESSAGE_BUS);
425     }
426
427     private Map<String, OperationalEnvironmentEntry> populateEnvironments() {
428         Map<String, OperationalEnvironmentEntry> envs = getEnvironmentsFromDb();
429         OperationalEnvironmentEntry confEntry = readEnvFromConfig();
430         envs.put(confEntry.getEnvironmentId(), confEntry);
431         return envs;
432     }
433
434     private OperationalEnvironmentEntry readEnvFromConfig() {
435         OperationalEnvironmentEntry entry = new OperationalEnvironmentEntry();
436         DistributionEngineConfiguration distributionEngineConfiguration = configurationManager
437                 .getDistributionEngineConfiguration();
438         entry.setUebApikey(distributionEngineConfiguration.getUebPublicKey());
439         entry.setUebSecretKey(distributionEngineConfiguration.getUebSecretKey());
440
441         Set<String> puebEndpoints = new HashSet<>();
442         puebEndpoints.addAll(distributionEngineConfiguration.getUebServers());
443         entry.setDmaapUebAddress(puebEndpoints);
444
445         String envName = distributionEngineConfiguration.getEnvironments().size() == 1
446                 ? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
447         entry.setEnvironmentId(envName);
448
449         if (log.isDebugEnabled()) {
450             log.debug("Enviroment read from configuration: {}", entry);
451         }
452
453         return entry;
454     }
455
456     private Map<String, OperationalEnvironmentEntry> getEnvironmentsFromDb() {
457         Either<List<OperationalEnvironmentEntry>, CassandraOperationStatus> opEnvResult = operationalEnvironmentDao
458                 .getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
459
460         if (opEnvResult.isLeft()) {
461             Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
462                     .collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
463             resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
464             return resultMap;
465         } else {
466             CassandraOperationStatus status = opEnvResult.right().value();
467             log.debug("Failed to populate Operation Envirenments Map from Cassandra, DB status: {}", status);
468             return new HashMap<>();
469         }
470     }
471
472     void createUebTopicsForEnvironments() {
473         environments.values().forEach(this::createUebTopicsForEnvironment);
474     }
475
476     public void createUebTopicsForEnvironment(OperationalEnvironmentEntry opEnvEntry) {
477         String envId = opEnvEntry.getEnvironmentId();
478         log.debug("Create Environment {} on UEB Topic.", envId);
479         AtomicBoolean status = new AtomicBoolean(false);
480         envNamePerStatus.put(envId, status);
481
482         connectUebTopicTenantIsolation(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
483     }
484
485     @VisibleForTesting
486     void setConfigurationManager(ConfigurationManager configurationManager) {
487         this.configurationManager = configurationManager;
488     }
489
490     public Map<String, OperationalEnvironmentEntry> getEnvironments() {
491         return environments;
492     }
493
494
495     public Either<OperationalEnvInfo, Integer> getOperationalEnvById(String id) {
496         HttpResponse<String> resp = aaiRequestHandler.getOperationalEnvById(id);
497         if (resp.getStatusCode() == HttpStatus.SC_OK) {
498             try {
499                 OperationalEnvInfo operationalEnvInfo = OperationalEnvInfo.createFromJson(resp.getResponse());
500
501                 log.debug("Get \"{}\" operational environment. {}", id, operationalEnvInfo);
502                 return Either.left(operationalEnvInfo);
503             } catch (Exception e) {
504                 log.debug("Json convert to OperationalEnvInfo failed with exception ", e);
505                 return Either.right(HttpStatus.SC_INTERNAL_SERVER_ERROR);
506             }
507         } else {
508             log.debug("Get \"{}\" operational environment failed with statusCode: {}, description: {}", id,
509                     resp.getStatusCode(), resp.getDescription());
510             return Either.right(resp.getStatusCode());
511         }
512     }
513
514     public OperationalEnvironmentEntry getEnvironmentById(String envId) {
515         return environments.get(envId);
516     }
517
518     public boolean isInMap(OperationalEnvironmentEntry env) {
519         return isInMap(env.getEnvironmentId());
520     }
521
522     public boolean isInMap(String envId) {
523         return environments.containsKey(envId);
524     }
525
526     public void addToMap(OperationalEnvironmentEntry opEnvEntry) {
527         environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
528
529     }
530 }