[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionEnginePollingTask.java
index 1246710..ab4400a 100644 (file)
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
@@ -51,6 +52,7 @@ public class DistributionEnginePollingTask implements Runnable {
     private String consumerId;
     private String consumerGroup;
     private CambriaHandler cambriaHandler = new CambriaHandler();
+    private final KafkaHandler kafkaHandler = new KafkaHandler();
     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
     private DistributionCompleteReporter distributionCompleteReporter;
     private ScheduledExecutorService scheduledPollingService = Executors
@@ -82,9 +84,12 @@ public class DistributionEnginePollingTask implements Runnable {
             fetchTimeoutInSec = 15;
         }
         try {
-            cambriaConsumer = cambriaHandler
-                .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
-                    consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+            if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+                cambriaConsumer = cambriaHandler
+                    .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
+                        environmentEntry.getUebSecretKey(),
+                        consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+            }
             if (scheduledPollingService != null) {
                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
@@ -119,14 +124,20 @@ public class DistributionEnginePollingTask implements Runnable {
     @Override
     public void run() {
         logger.trace("run() method. polling queue {}", topicName);
+        Either<Iterable<String>, CambriaErrorResponse> fetchResult;
         try {
             // init error
-            if (cambriaConsumer == null) {
-                BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
-                stopTask();
-                return;
+            if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+                if (cambriaConsumer == null) {
+                    BeEcompErrorManager.getInstance()
+                        .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+                    stopTask();
+                    return;
+                }
+                fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+            } else {
+                fetchResult = kafkaHandler.fetchFromTopic(topicName);
             }
-            Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
             // fetch error
             if (fetchResult.isRight()) {
                 CambriaErrorResponse errorResponse = fetchResult.right().value();