import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
private String consumerId;
private String consumerGroup;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
private DistributionCompleteReporter distributionCompleteReporter;
private ScheduledExecutorService scheduledPollingService = Executors
fetchTimeoutInSec = 15;
}
try {
- cambriaConsumer = cambriaHandler
- .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
- consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ cambriaConsumer = cambriaHandler
+ .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
+ environmentEntry.getUebSecretKey(),
+ consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ }
if (scheduledPollingService != null) {
logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
@Override
public void run() {
logger.trace("run() method. polling queue {}", topicName);
+ Either<Iterable<String>, CambriaErrorResponse> fetchResult;
try {
// init error
- if (cambriaConsumer == null) {
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- stopTask();
- return;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ if (cambriaConsumer == null) {
+ BeEcompErrorManager.getInstance()
+ .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+ stopTask();
+ return;
+ }
+ fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+ } else {
+ fetchResult = kafkaHandler.fetchFromTopic(topicName);
}
- Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
// fetch error
if (fetchResult.isRight()) {
CambriaErrorResponse errorResponse = fetchResult.right().value();