Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DmaapConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.components.distribution.engine;
22
23 import com.att.nsa.mr.client.MRConsumer;
24 import org.openecomp.sdc.be.config.ConfigurationManager;
25 import org.openecomp.sdc.be.config.DmaapConsumerConfiguration;
26 import org.openecomp.sdc.common.log.elements.LogFieldsMdcHandler;
27 import org.openecomp.sdc.common.log.wrappers.Logger;
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.stereotype.Service;
30
31 import java.lang.Thread.UncaughtExceptionHandler;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Consumer;
36
37 /**
38  * Allows consuming DMAAP topic according to received consumer parameters
39  * Allows processing received messages.
40  */
41 @Service
42 public class DmaapConsumer {
43     private static final String LOG_PARTNER_NAME = "SDC.BE";
44     private static final Logger logger = Logger.getLogger(DmaapClientFactory.class.getName());
45     private final ExecutorFactory executorFactory;
46     private final DmaapClientFactory dmaapClientFactory;
47     private final DmaapHealth dmaapHealth;
48     private static LogFieldsMdcHandler mdcFieldsHandler = new LogFieldsMdcHandler();
49     /**
50      * Allows to create an object of type DmaapConsumer
51      * @param executorFactory
52      * @param dmaapClientFactory
53      * @param dmaapHealth
54      */
55     @Autowired
56     public DmaapConsumer(ExecutorFactory executorFactory, DmaapClientFactory dmaapClientFactory,
57         DmaapHealth dmaapHealth) {
58         this.executorFactory = executorFactory;
59         this.dmaapClientFactory = dmaapClientFactory;
60         this.dmaapHealth = dmaapHealth;
61     }
62
63     /**
64      * Allows consuming DMAAP topic according to received consumer parameters
65      * @param notificationReceived
66      * @param exceptionHandler
67      * @throws Exception
68      */
69     public void consumeDmaapTopic(Consumer<String> notificationReceived, UncaughtExceptionHandler exceptionHandler) throws Exception {
70
71         DmaapConsumerConfiguration dmaapConsumerParams = ConfigurationManager.getConfigurationManager().getConfiguration().getDmaapConsumerConfiguration();
72         String topic = dmaapConsumerParams.getTopic();
73         logger.info("Starting to consume topic {} for DMAAP consumer with the next parameters {}. ", topic, dmaapConsumerParams);
74         MRConsumer consumer = dmaapClientFactory.create(dmaapConsumerParams);
75         ScheduledExecutorService pollExecutor = executorFactory.createScheduled(topic + "Client");
76         ExecutorService notificationExecutor = executorFactory.create(topic + "Consumer", exceptionHandler);
77
78         pollExecutor.scheduleWithFixedDelay(() -> {
79             logger.info("Trying to fetch messages from topic: {}", topic);
80             boolean isTopicAvailable = false;
81             mdcFieldsHandler.addInfoForErrorAndDebugLogging(LOG_PARTNER_NAME);
82             try {
83                 Iterable<String> messages = consumer.fetch();
84                 isTopicAvailable = true ;
85                 if (messages != null) {
86                     for (String msg : messages) {
87                         logger.info("The DMAAP message {} received. The topic is {}.", msg, topic);
88                         notificationExecutor.execute(() -> notificationReceived.accept(msg));
89                     }
90                 }
91                 //successfully fetched
92             }
93             catch (Exception e) {
94                 logger.error("The exception occurred upon fetching DMAAP message", e);
95             }
96             dmaapHealth.report( isTopicAvailable );
97         }, 0L, dmaapConsumerParams.getPollingInterval(), TimeUnit.SECONDS);
98     }
99
100 }