1 package org.openecomp.sdc.be.components.distribution.engine;
3 import com.att.nsa.mr.client.MRConsumer;
4 import org.openecomp.sdc.be.config.ConfigurationManager;
5 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.stereotype.Service;
11 import java.lang.Thread.UncaughtExceptionHandler;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.function.Consumer;
18 * Allows consuming DMAAP topic according to received consumer parameters
19 * Allows processing received messages.
22 public class DmaapConsumer {
23 private final ExecutorFactory executorFactory;
24 private final DmaapClientFactory dmaapClientFactory;
25 private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class);
28 private DmaapHealth dmaapHealth;
30 * Allows to create an object of type DmaapConsumer
31 * @param executorFactory
32 * @param dmaapClientFactory
35 public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
36 this.executorFactory = executorFactory;
37 this.dmaapClientFactory = dmaapClientFactory;
41 * Allows consuming DMAAP topic according to received consumer parameters
42 * @param notificationReceived
43 * @param exceptionHandler
46 public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
48 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
49 String topic = dmaapConsumerParams.getTopic();
50 logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
51 MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
52 ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
53 ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
55 pollExecutor.scheduleWithFixedDelay(() -> {
56 logger.info("Trying to fetch messages from topic: {}", topic);
57 boolean isTopicAvailable = false;
59 Iterable<String> messages = consumer.fetch();
60 isTopicAvailable = true ;
61 if (messages != null) {
62 for (String msg : messages) {
63 logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
64 notificationExecutor.execute(() -> notificationReceived.accept(msg));
67 //successfully fetched
70 logger.error("The exception {} occured upon fetching DMAAP message", e);
72 dmaapHealth.report( isTopicAvailable );
73 }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);