Sync Integ to Master
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DmaapConsumer.java
1 package org.openecomp.sdc.be.components.distribution.engine;
2
3 import com.att.nsa.mr.client.MRConsumer;
4 import org.openecomp.sdc.be.config.ConfigurationManager;
5 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.stereotype.Service;
10
11 import java.lang.Thread.UncaughtExceptionHandler;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit;
15 import java.util.function.Consumer;
16
17 /**
18  * Allows consuming DMAAP topic according to received consumer parameters
19  * Allows processing received messages.
20  */
21 @Service
22 public class DmaapConsumer {
23     private final ExecutorFactory executorFactory;
24     private final DmaapClientFactory dmaapClientFactory;
25     private static final Logger logger = LoggerFactory.getLogger(DmaapClientFactory.class);
26
27     @Autowired
28     private DmaapHealth dmaapHealth;
29     /**
30      * Allows to create an object of type DmaapConsumer
31      * @param executorFactory
32      * @param dmaapClientFactory
33      */
34     @Autowired
35     public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
36         this.executorFactory = executorFactory;
37         this.dmaapClientFactory = dmaapClientFactory;
38     }
39
40     /**
41      * Allows consuming DMAAP topic according to received consumer parameters
42      * @param notificationReceived
43      * @param exceptionHandler
44      * @throws Exception
45      */
46     public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
47
48         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
49         String topic = dmaapConsumerParams.getTopic();
50         logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
51         MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
52         ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
53         ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
54
55         pollExecutor.scheduleWithFixedDelay(() -> {
56             logger.info("Trying to fetch messages from topic: {}", topic);
57             boolean isTopicAvailable = false;
58             try {
59                 Iterable<String> messages = consumer.fetch();
60                 isTopicAvailable = true ;
61                 if (messages != null) {
62                     for (String msg : messages) {
63                         logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
64                         notificationExecutor.execute(() -> notificationReceived.accept(msg));
65                     }
66                 }
67                 //successfully fetched
68             }
69             catch (Exception e) {
70                 logger.error("The exception {} occured upon fetching DMAAP message", e);
71             }
72             dmaapHealth.report( isTopicAvailable );
73         }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
74     }
75
76 }