Release version 1.13.7
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DmaapConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import com.att.nsa.mr.client.MRConsumer;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Consumer;
28 import org.openecomp.sdc.be.config.ConfigurationManager;
29 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
30 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
31 import org.openecomp.sdc.common.log.wrappers.Logger;
32 import org.springframework.beans.factory.annotation.Autowired;
33 import org.springframework.stereotype.Service;
34
35 /**
36  * Allows consuming DMAAP topic according to received consumer parameters Allows processing received messages.
37  */
38 @Service
39 public class DmaapConsumer {
40
41     private static final String LOG_PARTNER_NAME = "SDC.BE";
42     private static final Logger logger = Logger.getLogger(DmaapConsumer.class.getName());
43     private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
44     private final ExecutorFactory executorFactory;
45     private final DmaapClientFactory dmaapClientFactory;
46     private final DmaapHealth dmaapHealth;
47
48     /**
49      * Allows to create an object of type DmaapConsumer
50      *
51      * @param executorFactory
52      * @param dmaapClientFactory
53      * @param dmaapHealth
54      */
55     @Autowired
56     public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory, DmaapHealth dmaapHealth) {
57         this.executorFactory = executorFactory;
58         this.dmaapClientFactory = dmaapClientFactory;
59         this.dmaapHealth = dmaapHealth;
60     }
61
62     /**
63      * Allows consuming DMAAP topic according to received consumer parameters
64      *
65      * @param notificationReceived
66      * @param exceptionHandler
67      * @throws Exception
68      */
69     public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
70         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration()
71             .getDmaapConsumerConfiguration();
72         String topic = dmaapConsumerParams.getTopic();
73         logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
74         MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
75         ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
76         ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
77         pollExecutor.scheduleWithFixedDelay(() -> {
78             logger.info("Trying to fetch messages from topic: {}", topic);
79             boolean isTopicAvailable = false;
80             mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
81             try {
82                 Iterable<String> messages = consumer.fetch();
83                 isTopicAvailable = true;
84                 if (messages != null) {
85                     for (String msg : messages) {
86                         logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
87                         notificationExecutor.execute(() -> notificationReceived.accept(msg));
88                     }
89                 }
90                 //successfully fetched
91             } catch (Exception e) {
92                 logger.error("The exception occurred upon fetching DMAAP message", e);
93             }
94             dmaapHealth.report(isTopicAvailable);
95         }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
96     }
97 }