Sync Integ to Master
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEngineInitTask.java
index 1eeaa12..1759f69 100644 (file)
 
 package org.openecomp.sdc.be.components.distribution.engine;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import fj.data.Either;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
 import org.openecomp.sdc.be.impl.ComponentsUtils;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
 import org.openecomp.sdc.be.resources.data.auditing.AuditingActionEnum;
-import org.openecomp.sdc.common.config.EcompErrorName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import fj.data.Either;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 public class DistributionEngineInitTask implements Runnable {
 
-       public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
-       public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
-       public static final String CONSUMER = "CONSUMER";
-       public static final String PRODUCER = "PRODUCER";
-       public static final String CREATED = "CREATED";
-       public static final String FAILED = "FAILED";
-       public static final Integer HTTP_OK = 200;
-
-       private Long delayBeforeStartFlow = 0l;
-       private DistributionEngineConfiguration deConfiguration;
-       private String envName;
-       private long retryInterval;
-       private long currentRetryInterval;
-       private long maxInterval;
-       // private boolean active = false;
-       boolean maximumRetryInterval = false;
-       private AtomicBoolean status = null;
-       ComponentsUtils componentsUtils = null;
-       DistributionEnginePollingTask distributionEnginePollingTask = null;
-
-       private CambriaHandler cambriaHandler = new CambriaHandler();
-
-       public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask) {
-               super();
-               this.delayBeforeStartFlow = delayBeforeStartFlow;
-               this.deConfiguration = deConfiguration;
-               this.envName = envName;
-               this.retryInterval = deConfiguration.getInitRetryIntervalSec();
-               this.currentRetryInterval = retryInterval;
-               this.maxInterval = deConfiguration.getInitMaxIntervalSec();
-               this.status = status;
-               this.componentsUtils = componentsUtils;
-               this.distributionEnginePollingTask = distributionEnginePollingTask;
-       }
-
-       private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
-
-       private static Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class.getName());
-
-       ScheduledFuture<?> scheduledFuture = null;
-
-       public void startTask() {
-               if (scheduledExecutorService != null) {
-                       Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
-                       logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
-                       this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
-
-               }
-       }
-
-       public void restartTask() {
-
-               this.stopTask();
-
-               logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
-
-               long lastCurrentInterval = currentRetryInterval;
-               incrementRetryInterval();
-
-               this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
-
-       }
-
-       protected void incrementRetryInterval() {
-               if (currentRetryInterval < maxInterval) {
-                       currentRetryInterval *= 2;
-                       if (currentRetryInterval > maxInterval) {
-                               setMaxRetryInterval();
-                       }
-               } else {
-                       setMaxRetryInterval();
-               }
-       }
-
-       private void setMaxRetryInterval() {
-               currentRetryInterval = maxInterval;
-               maximumRetryInterval = true;
-               logger.debug("Set next retry init interval to {}", maxInterval);
-       }
-
-       public void stopTask() {
-               if (scheduledFuture != null) {
-                       boolean result = scheduledFuture.cancel(true);
-                       logger.debug("Stop reinit task. result = {}", result);
-                       if (false == result) {
-                               BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
-                               BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
-                       }
-                       scheduledFuture = null;
-               }
-       }
-
-       public void destroy() {
-               this.stopTask();
-               if (scheduledExecutorService != null) {
-                       scheduledExecutorService.shutdown();
-               }
-       }
-
-       @Override
-       public void run() {
-
-               boolean result = false;
-               result = initFlow();
-
-               if (true == result) {
-                       this.stopTask();
-                       this.status.set(true);
-                       if (this.distributionEnginePollingTask != null) {
-                               String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
-                               logger.debug("start polling distribution status topic {}", topicName);
-                               this.distributionEnginePollingTask.startTask(topicName);
-                       }
-               } else {
-                       if (false == maximumRetryInterval) {
-                               this.restartTask();
-                       }
-               }
-       }
-
-       /**
-        * run initialization flow
-        * 
-        * @return
-        */
-       public boolean initFlow() {
-
-               logger.trace("Start init flow for environment {}", this.envName);
-
-               Set<String> topicsList = null;
-               Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-
-               getTopicsRes = cambriaHandler.getTopics(deConfiguration.getUebServers());
-               if (getTopicsRes.isRight()) {
-                       CambriaErrorResponse status = getTopicsRes.right().value();
-                       if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
-                               topicsList = new HashSet<>();
-                       } else {
-                               BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
-                               BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
-
-                               return false;
-                       }
-               } else {
-                       topicsList = getTopicsRes.left().value();
-               }
-
-               String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
-               logger.debug("Going to handle topic {}", notificationTopic);
-
-               boolean status = createTopicIfNotExists(topicsList, notificationTopic);
-               if (false == status) {
-                       return false;
-               }
-
-               CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-
-               CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-
-               if (createStatus != CambriaOperationStatus.OK) {
-                       return false;
-               }
-
-               String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
-               logger.debug("Going to handle topic {}", statusTopic);
-               status = createTopicIfNotExists(topicsList, statusTopic);
-               if (false == status) {
-                       return false;
-               }
-
-               CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-
-               if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
-                       return false;
-               }
-
-               return true;
-       }
-
-       private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
-               CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(deConfiguration.getUebServers(), topicName, deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), deConfiguration.getUebPublicKey(), subscriberType);
-
-               String role = CONSUMER;
-               if (subscriberType == SubscriberTypeEnum.PRODUCER) {
-                       role = PRODUCER;
-               }
-               auditRegistration(topicName, registerStatus, role);
-               return registerStatus;
-       }
-
-       private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
-               if (componentsUtils != null) {
-                       Integer httpCode = registerProducerStatus.getHttpCode();
-                       String httpCodeStr = String.valueOf(httpCode);
-                       this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, deConfiguration.getUebPublicKey(), httpCodeStr);
-               }
-       }
-
-       private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
-
-               if (topicsList.contains(topicName)) {
-                       if (componentsUtils != null) {
-                               this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
-                       }
-                       return true;
-               }
-
-               CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(deConfiguration.getUebServers(), deConfiguration.getUebPublicKey(), deConfiguration.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
-                               deConfiguration.getCreateTopic().getReplicationCount());
-
-               CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
-               if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
-                       if (componentsUtils != null) {
-                               this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
-                       }
-               } else if (status == CambriaOperationStatus.OK) {
-                       if (componentsUtils != null) {
-                               this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
-                       }
-               } else {
-                       if (componentsUtils != null) {
-                               this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
-                       }
-                       BeEcompErrorManager.getInstance().processEcompError(EcompErrorName.BeUebSystemError, INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
-                       BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
-
-                       return false;
-               }
-
-               return true;
-       }
-
-       public static String buildTopicName(String topicName, String environment) {
-               return topicName + "-" + environment.toUpperCase();
-       }
-
-       public boolean isActive() {
-               return this.status.get();
-       }
-
-       public long getCurrentRetryInterval() {
-               return currentRetryInterval;
-       }
-
-       protected void setCambriaHandler(CambriaHandler cambriaHandler) {
-               this.cambriaHandler = cambriaHandler;
-       }
+    public static final String INIT_DISTRIBUTION_ENGINE_FLOW = "initDistributionEngine";
+    public static final String ALREADY_EXISTS = "ALREADY_EXISTS";
+    public static final String CONSUMER = "CONSUMER";
+    public static final String PRODUCER = "PRODUCER";
+    public static final String CREATED = "CREATED";
+    public static final String FAILED = "FAILED";
+    public static final Integer HTTP_OK = 200;
+
+    private Long delayBeforeStartFlow = 0l;
+    private DistributionEngineConfiguration deConfiguration;
+    private String envName;
+    private long retryInterval;
+    private long currentRetryInterval;
+    private long maxInterval;
+    boolean maximumRetryInterval = false;
+    private AtomicBoolean status = null;
+    ComponentsUtils componentsUtils = null;
+    DistributionEnginePollingTask distributionEnginePollingTask = null;
+    private OperationalEnvironmentEntry environmentEntry;
+
+    private CambriaHandler cambriaHandler = new CambriaHandler();
+
+    public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, AtomicBoolean status, ComponentsUtils componentsUtils, DistributionEnginePollingTask distributionEnginePollingTask, OperationalEnvironmentEntry environmentEntry) {
+        super();
+        this.delayBeforeStartFlow = delayBeforeStartFlow;
+        this.deConfiguration = deConfiguration;
+        this.envName = envName;
+        this.retryInterval = deConfiguration.getInitRetryIntervalSec();
+        this.currentRetryInterval = retryInterval;
+        this.maxInterval = deConfiguration.getInitMaxIntervalSec();
+        this.status = status;
+        this.componentsUtils = componentsUtils;
+        this.distributionEnginePollingTask = distributionEnginePollingTask;
+        this.environmentEntry = environmentEntry;
+    }
+
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributionEngineInitTask.class);
+
+    ScheduledFuture<?> scheduledFuture = null;
+
+    public void startTask() {
+        if (scheduledExecutorService != null) {
+            Integer retryInterval = deConfiguration.getInitRetryIntervalSec();
+            logger.debug("Start Distribution Engine init task. retry interval {} seconds, delay before first run {} seconds", retryInterval, delayBeforeStartFlow);
+            this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, delayBeforeStartFlow, retryInterval, TimeUnit.SECONDS);
+
+        }
+    }
+
+    public void restartTask() {
+
+        this.stopTask();
+
+        logger.debug("Start Distribution Engine init task. next run in {} seconds", this.currentRetryInterval);
+
+        long lastCurrentInterval = currentRetryInterval;
+        incrementRetryInterval();
+
+        this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this, lastCurrentInterval, this.currentRetryInterval, TimeUnit.SECONDS);
+
+    }
+
+    protected void incrementRetryInterval() {
+        if (currentRetryInterval < maxInterval) {
+            currentRetryInterval *= 2;
+            if (currentRetryInterval > maxInterval) {
+                setMaxRetryInterval();
+            }
+        } else {
+            setMaxRetryInterval();
+        }
+    }
+
+    private void setMaxRetryInterval() {
+        currentRetryInterval = maxInterval;
+        maximumRetryInterval = true;
+        logger.debug("Set next retry init interval to {}", maxInterval);
+    }
+
+    public void stopTask() {
+        if (scheduledFuture != null) {
+            boolean result = scheduledFuture.cancel(true);
+            logger.debug("Stop reinit task. result = {}", result);
+            if (false == result) {
+                BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to stop the reinit task");
+            }
+            scheduledFuture = null;
+        }
+    }
+
+    public void destroy() {
+        this.stopTask();
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdown();
+        }
+    }
+
+    @Override
+    public void run() {
+
+        boolean result = false;
+        result = initFlow();
+
+        if (true == result) {
+            this.stopTask();
+            this.status.set(true);
+            if (this.distributionEnginePollingTask != null) {
+                String topicName = buildTopicName(deConfiguration.getDistributionStatusTopicName(), envName);
+                logger.debug("start polling distribution status topic {}", topicName);
+                this.distributionEnginePollingTask.startTask(topicName);
+            }
+        } else {
+            if (false == maximumRetryInterval) {
+                this.restartTask();
+            }
+        }
+    }
+
+    /**
+     * run initialization flow
+     *
+     * @return
+     */
+    public boolean initFlow() {
+
+        logger.trace("Start init flow for environment {}", this.envName);
+
+        Set<String> topicsList = null;
+        Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
+
+        getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
+        if (getTopicsRes.isRight()) {
+            CambriaErrorResponse status = getTopicsRes.right().value();
+            if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+                topicsList = new HashSet<>();
+            } else {
+                BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+                return false;
+            }
+        } else {
+            topicsList = getTopicsRes.left().value();
+        }
+
+        String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+        logger.debug("Going to handle topic {}", notificationTopic);
+
+        boolean status = createTopicIfNotExists(topicsList, notificationTopic);
+        if (false == status) {
+            return false;
+        }
+
+        CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
+
+        CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+
+        if (createStatus != CambriaOperationStatus.OK) {
+            return false;
+        }
+
+        String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+        logger.debug("Going to handle topic {}", statusTopic);
+        status = createTopicIfNotExists(topicsList, statusTopic);
+        if (false == status) {
+            return false;
+        }
+
+        CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+
+        if (registerConcumerStatus.getOperationStatus() != CambriaOperationStatus.OK) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
+        CambriaErrorResponse registerStatus = cambriaHandler.registerToTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), environmentEntry.getUebApikey(), subscriberType, topicName);
+
+        String role = CONSUMER;
+        if (subscriberType == SubscriberTypeEnum.PRODUCER) {
+            role = PRODUCER;
+        }
+        auditRegistration(topicName, registerStatus, role);
+        return registerStatus;
+    }
+
+    private void auditRegistration(String notificationTopic, CambriaErrorResponse registerProducerStatus, String role) {
+        if (componentsUtils != null) {
+            Integer httpCode = registerProducerStatus.getHttpCode();
+            String httpCodeStr = String.valueOf(httpCode);
+            this.componentsUtils.auditDistributionEngine(AuditingActionEnum.ADD_KEY_TO_TOPIC_ACL, this.envName, notificationTopic, role, environmentEntry.getUebApikey(), httpCodeStr);
+        }
+    }
+
+    private boolean createTopicIfNotExists(Set<String> topicsList, String topicName) {
+
+        if (topicsList.contains(topicName)) {
+            if (componentsUtils != null) {
+                this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+            }
+            return true;
+        }
+
+        CambriaErrorResponse createDistribTopicStatus = cambriaHandler.createTopic(environmentEntry.getDmaapUebAddress(), environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), topicName, deConfiguration.getCreateTopic().getPartitionCount(),
+                deConfiguration.getCreateTopic().getReplicationCount());
+
+        CambriaOperationStatus status = createDistribTopicStatus.getOperationStatus();
+        if (status == CambriaOperationStatus.TOPIC_ALREADY_EXIST) {
+            if (componentsUtils != null) {
+                this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, ALREADY_EXISTS);
+            }
+        } else if (status == CambriaOperationStatus.OK) {
+            if (componentsUtils != null) {
+                this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, CREATED);
+            }
+        } else {
+            if (componentsUtils != null) {
+                this.componentsUtils.auditDistributionEngine(AuditingActionEnum.CREATE_DISTRIBUTION_TOPIC, this.envName, topicName, null, null, FAILED);
+            }
+            BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try to create topic " + topicName);
+            return false;
+        }
+
+        return true;
+    }
+
+    public static String buildTopicName(String topicName, String environment) {
+        return topicName + "-" + environment.toUpperCase();
+    }
+
+    public boolean isActive() {
+        return this.status.get();
+    }
+
+    public long getCurrentRetryInterval() {
+        return currentRetryInterval;
+    }
+
+    protected void setCambriaHandler(CambriaHandler cambriaHandler) {
+        this.cambriaHandler = cambriaHandler;
+    }
 }