[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEngineInitTask.java
index d61e150..00d3fed 100644 (file)
@@ -20,6 +20,7 @@
 package org.openecomp.sdc.be.components.distribution.engine;
 
 import fj.data.Either;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
@@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable {
     private AtomicBoolean status = null;
     private OperationalEnvironmentEntry environmentEntry;
     private CambriaHandler cambriaHandler = new CambriaHandler();
+    private KafkaHandler kafkaHandler = new KafkaHandler();
     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
@@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable {
 
     @Override
     public void run() {
-        boolean result = false;
-        result = initFlow();
-        if (result) {
+        if (initFlow()) {
             this.stopTask();
             this.status.set(true);
             if (this.distributionEnginePollingTask != null) {
@@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable {
      * @return
      */
     public boolean initFlow() {
-        logger.trace("Start init flow for environment {}", this.envName);
-        Set<String> topicsList = null;
-        Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-        getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
-        if (getTopicsRes.isRight()) {
-            CambriaErrorResponse status = getTopicsRes.right().value();
-            if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
-                topicsList = new HashSet<>();
+        logger.info("Start init flow for environment {}", this.envName);
+        if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+            Set<String> topicsList;
+            Either<Set<String>, CambriaErrorResponse> getTopicsRes;
+            getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress()));
+            if (getTopicsRes.isRight()) {
+                CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value();
+                if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+                    topicsList = new HashSet<>();
+                } else {
+                    BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW,
+                        "try retrieve list of topics from U-EB server");
+                    return false;
+                }
             } else {
-                BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+                topicsList = getTopicsRes.left().value();
+            }
+            String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+            logger.debug("Going to handle topic {}", notificationTopic);
+            if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
+                return false;
+            }
+            CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic,
+                SubscriberTypeEnum.PRODUCER);
+            CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+            if (createStatus != CambriaOperationStatus.OK) {
+                return false;
+            }
+            String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+            logger.debug("Going to handle topic {}", statusTopic);
+            if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
                 return false;
             }
+            CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+            return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
         } else {
-            topicsList = getTopicsRes.left().value();
+            logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging");
+            return true;
         }
-        String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
-        logger.debug("Going to handle topic {}", notificationTopic);
-        if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
-            return false;
-        }
-        CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-        CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-        if (createStatus != CambriaOperationStatus.OK) {
-            return false;
-        }
-        String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
-        logger.debug("Going to handle topic {}", statusTopic);
-        if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
-            return false;
-        }
-        CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-        return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
     }
 
     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
@@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable {
     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
         this.cambriaHandler = cambriaHandler;
     }
+
+    protected void setKafkaHandler(KafkaHandler kafkaHandler) {
+        this.kafkaHandler = kafkaHandler;
+    }
 }