re base code
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DmaapConsumer.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import com.att.nsa.mr.client.MRConsumer;
4 import org.openecomp.sdc.be.config.ConfigurationManager;
5 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
6 import org.openecomp.sdc.common.log.wrappers.Logger;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Service;
9
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.TimeUnit;
14 import java.util.function.Consumer;
15
16 /**
17  * Allows consuming DMAAP topic according to received consumer parameters
18  * Allows processing received messages.
19  */
20 @Service
21 public class DmaapConsumer {
22     private final ExecutorFactory executorFactory;
23     private final DmaapClientFactory dmaapClientFactory;
24     private static final Logger logger = Logger.getLogger(DmaapClientFactory.class.getName());
25
26     @Autowired
27     private DmaapHealth dmaapHealth;
28     /**
29      * Allows to create an object of type DmaapConsumer
30      * @param executorFactory
31      * @param dmaapClientFactory
32      */
33     @Autowired
34     public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
35         this.executorFactory = executorFactory;
36         this.dmaapClientFactory = dmaapClientFactory;
37     }
38
39     /**
40      * Allows consuming DMAAP topic according to received consumer parameters
41      * @param notificationReceived
42      * @param exceptionHandler
43      * @throws Exception
44      */
45     public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
46
47         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
48         String topic = dmaapConsumerParams.getTopic();
49         logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
50         MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
51         ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
52         ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
53
54         pollExecutor.scheduleWithFixedDelay(() -> {
55             logger.info("Trying to fetch messages from topic: {}", topic);
56             boolean isTopicAvailable = false;
57             try {
58                 Iterable<String> messages = consumer.fetch();
59                 isTopicAvailable = true ;
60                 if (messages != null) {
61                     for (String msg : messages) {
62                         logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
63                         notificationExecutor.execute(() -> notificationReceived.accept(msg));
64                     }
65                 }
66                 //successfully fetched
67             }
68             catch (Exception e) {
69                 logger.error("The exception occured upon fetching DMAAP message", e);
70             }
71             dmaapHealth.report( isTopicAvailable );
72         }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
73     }
74
75 }