Added oparent to sdc main
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DmaapConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.nsa.mr.client.MRConsumer;
24 import org.openecomp.sdc.be.config.ConfigurationManager;
25 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
26 import org.openecomp.sdc.common.log.wrappers.Logger;
27 import org.springframework.beans.factory.annotation.Autowired;
28 import org.springframework.stereotype.Service;
29
30 import java.lang.Thread.UncaughtExceptionHandler;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.TimeUnit;
34 import java.util.function.Consumer;
35
36 /**
37  * Allows consuming DMAAP topic according to received consumer parameters
38  * Allows processing received messages.
39  */
40 @Service
41 public class DmaapConsumer {
42     private final ExecutorFactory executorFactory;
43     private final DmaapClientFactory dmaapClientFactory;
44     private static final Logger logger = Logger.getLogger(DmaapClientFactory.class.getName());
45
46     @Autowired
47     private DmaapHealth dmaapHealth;
48     /**
49      * Allows to create an object of type DmaapConsumer
50      * @param executorFactory
51      * @param dmaapClientFactory
52      */
53     @Autowired
54     public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory) {
55         this.executorFactory = executorFactory;
56         this.dmaapClientFactory = dmaapClientFactory;
57     }
58
59     /**
60      * Allows consuming DMAAP topic according to received consumer parameters
61      * @param notificationReceived
62      * @param exceptionHandler
63      * @throws Exception
64      */
65     public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
66
67         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
68         String topic = dmaapConsumerParams.getTopic();
69         logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
70         MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
71         ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
72         ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
73
74         pollExecutor.scheduleWithFixedDelay(() -> {
75             logger.info("Trying to fetch messages from topic: {}", topic);
76             boolean isTopicAvailable = false;
77             try {
78                 Iterable<String> messages = consumer.fetch();
79                 isTopicAvailable = true ;
80                 if (messages != null) {
81                     for (String msg : messages) {
82                         logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
83                         notificationExecutor.execute(() -> notificationReceived.accept(msg));
84                     }
85                 }
86                 //successfully fetched
87             }
88             catch (Exception e) {
89                 logger.error("The exception occured upon fetching DMAAP message", e);
90             }
91             dmaapHealth.report( isTopicAvailable );
92         }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
93     }
94
95 }