package org.openecomp.sdc.be.components.distribution.engine;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import javax.annotation.PostConstruct;
-
+import com.att.aft.dme2.api.DME2Exception;
+import com.att.aft.dme2.iterator.DME2EndpointIterator;
+import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
+import com.att.aft.dme2.manager.registry.DME2Endpoint;
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import fj.data.Either;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.openecomp.sdc.be.components.distribution.engine.IDmaapNotificationData.DmaapActionEnum;
import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
import org.openecomp.sdc.common.datastructure.Wrapper;
import org.openecomp.sdc.common.http.client.api.HttpResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.openecomp.sdc.common.log.wrappers.Logger;
import org.springframework.stereotype.Service;
-import com.att.aft.dme2.api.DME2Exception;
-import com.att.aft.dme2.iterator.DME2EndpointIterator;
-import com.att.aft.dme2.iterator.domain.DME2EndpointReference;
-import com.att.aft.dme2.manager.registry.DME2Endpoint;
-import com.att.nsa.apiClient.credentials.ApiCredential;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
-import fj.data.Either;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.openecomp.sdc.common.datastructure.FunctionalInterfaces.runMethodWithTimeOut;
/**
* Allows to consume DMAAP topic and handle received notifications
private static final String MESSAGE_BUS = "MessageBus";
private static final String UNKNOWN = "Unknown";
- private static final Logger log = LoggerFactory.getLogger(EnvironmentsEngine.class);
+ private static final Logger log = Logger.getLogger(EnvironmentsEngine.class.getName());
private ConfigurationManager configurationManager = ConfigurationManager.getConfigurationManager();
private Map<String, OperationalEnvironmentEntry> environments;
try {
environments = populateEnvironments();
createUebTopicsForEnvironments();
+ initDmeGlobalConfig();
dmaapConsumer.consumeDmaapTopic(this::handleMessage,
- (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
+ (t, e) -> log.error("An error occurred upon consuming topic by Dmaap consumer client: ", e));
+ } catch (Exception e) {
+ log.error("An error occurred upon consuming topic by Dmaap consumer client.", e);
}
- catch (Exception e) {
- log.error("An error occurred upon consuming topic by Dmaap consumer client." , e);
+ }
+
+ private void initDmeGlobalConfig() {
+ DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
+ if (dmaapConsumerParams == null) {
+ log.warn("cannot read dmaap configuration file,DME might not be initialized properly");
+ return;
}
+ System.setProperty("AFT_ENVIRONMENT", dmaapConsumerParams.getEnvironment()); // AFTPRD for production
+ System.setProperty("AFT_LATITUDE", dmaapConsumerParams.getLatitude()!=null ? dmaapConsumerParams.getLatitude().toString() : "1.0"); // Replace with actual latitude
+ System.setProperty("AFT_LONGITUDE", dmaapConsumerParams.getLongitude()!=null ? dmaapConsumerParams.getLongitude().toString() : "1.0"); // Replace with actual longitude
}
+
public void connectUebTopicTenantIsolation(OperationalEnvironmentEntry opEnvEntry,
- AtomicBoolean status,
- Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
+ AtomicBoolean status,
+ Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
connectUebTopic(opEnvEntry, status, envNamePerInitTask, envNamePerPollingTask);
}
public void connectUebTopicForDistributionConfTopic(String envName,
- AtomicBoolean status,
- Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask){
+ AtomicBoolean status,
+ Map<String, DistributionEngineInitTask> envNamePerInitTask, Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
connectUebTopic(environments.get(envName), status, envNamePerInitTask, envNamePerPollingTask);
}
+
/**
* Allows to create and run UEB initializing and polling tasks
+ *
* @param status
* @param envNamePerInitTask
* @param envNamePerPollingTask
* @param opEnvEntry
*/
private void connectUebTopic(OperationalEnvironmentEntry opEnvEntry, AtomicBoolean status,
- Map<String, DistributionEngineInitTask> envNamePerInitTask,
- Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
+ Map<String, DistributionEngineInitTask> envNamePerInitTask,
+ Map<String, DistributionEnginePollingTask> envNamePerPollingTask) {
String envId = opEnvEntry.getEnvironmentId();
}
public boolean handleMessageLogic(String notification) {
- Wrapper<Boolean> errorWrapper = new Wrapper<>();
+ Wrapper<Boolean> errorWrapper = new Wrapper<>();
Wrapper<OperationalEnvironmentEntry> opEnvEntryWrapper = new Wrapper<>();
try {
DmaapNotificationDataImpl.class);
AuditingActionEnum actionEnum;
- switch(notificationData.getAction()) {
+ switch (notificationData.getAction()) {
case CREATE:
actionEnum = AuditingActionEnum.CREATE_ENVIRONMENT;
break;
}
private void validateNotification(Wrapper<Boolean> errorWrapper, IDmaapNotificationData notificationData,
- IDmaapAuditNotificationData auditNotificationData) {
+ IDmaapAuditNotificationData auditNotificationData) {
// Check OperationaEnvironmentType
if (errorWrapper.isEmpty()) {
validateEnvironmentType(errorWrapper, notificationData, auditNotificationData);
// Update Environments Map
if (errorWrapper.isEmpty()) {
environments.put(opEnvEntry.getEnvironmentId(), opEnvEntry);
- }
- else{
+ } else {
saveEntryWithFailedStatus(errorWrapper, opEnvEntry);
}
}
log.debug("handle message - Get List Of UEB Addresses From AFT_DME");
try {
boolean isKeyFieldsValid = !isEmpty(opEnvEntry.getTenant()) && !isEmpty(opEnvEntry.getEcompWorkloadContext());
- if( isKeyFieldsValid ){
+ if (isKeyFieldsValid) {
String opEnvKey = map2OpEnvKey(opEnvEntry);
String environmentId = opEnvEntry.getEnvironmentId();
List<String> uebHosts = discoverUebHosts(opEnvKey, environmentId);
opEnvEntry.setDmaapUebAddress(uebHosts.stream().collect(Collectors.toSet()));
- }
- else{
+ } else {
errorWrapper.setInnerElement(false);
log.debug("Can Not Build AFT DME Key from workLoad & Tenant Fields.");
}
? distributionEngineConfiguration.getEnvironments().get(0) : UNKNOWN;
entry.setEnvironmentId(envName);
- if(log.isDebugEnabled()) {
- log.debug("Enviroment read from configuration: {}", entry.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("Enviroment read from configuration: {}", entry);
}
return entry;
.getByEnvironmentsStatus(EnvironmentStatusEnum.COMPLETED);
if (opEnvResult.isLeft()) {
- Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
+ Map<String, OperationalEnvironmentEntry> resultMap = opEnvResult.left().value().stream()
.collect(Collectors.toMap(OperationalEnvironmentEntry::getEnvironmentId, Function.identity()));
- resultMap.forEach( (key, value) -> log.debug("Enviroment loaded from DB: {}", value.toString()) );
+ resultMap.forEach((key, value) -> log.debug("Enviroment loaded from DB: {}", value));
return resultMap;
} else {
CassandraOperationStatus status = opEnvResult.right().value();
}
}
- public OperationalEnvironmentEntry getEnvironmentById (String envId) {
+ public OperationalEnvironmentEntry getEnvironmentById(String envId) {
return environments.get(envId);
}