1 package org.openecomp.sdc.be.components.distribution.engine;
3 import com.att.nsa.mr.client.MRConsumer;
4 import org.openecomp.sdc.be.config.ConfigurationManager;
5 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
6 import org.openecomp.sdc.common.log.wrappers.Logger;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Service;
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14 import java.util.function.Consumer;
17 * Allows consuming DMAAP topic according to received consumer parameters
18 * Allows processing received messages.
21 public class DmaapConsumer {
22 private final ExecutorFactory executorFactory;
23 private final DmaapClientFactory dmaapClientFactory;
24 private static final Logger logger = Logger.getLogger(DmaapClientFactory.class.getName());
27 private DmaapHealth dmaapHealth;
29 * Allows to create an object of type DmaapConsumer
30 * @param executorFactory
31 * @param dmaapClientFactory
34 public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
35 this.executorFactory = executorFactory;
36 this.dmaapClientFactory = dmaapClientFactory;
40 * Allows consuming DMAAP topic according to received consumer parameters
41 * @param notificationReceived
42 * @param exceptionHandler
45 public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
47 DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
48 String topic = dmaapConsumerParams.getTopic();
49 logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
50 MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
51 ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
52 ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
54 pollExecutor.scheduleWithFixedDelay(() -> {
55 logger.info("Trying to fetch messages from topic: {}", topic);
56 boolean isTopicAvailable = false;
58 Iterable<String> messages = consumer.fetch();
59 isTopicAvailable = true ;
60 if (messages != null) {
61 for (String msg : messages) {
62 logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
63 notificationExecutor.execute(() -> notificationReceived.accept(msg));
66 //successfully fetched
69 logger.error("The exception occured upon fetching DMAAP message", e);
71 dmaapHealth.report( isTopicAvailable );
72 }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);