17738f19443635a8c858a15d19a3d591c5ad6663
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.kafka;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25 import java.util.Properties;
26
27 import org.apache.kafka.clients.consumer.ConsumerRecord;
28 import org.apache.kafka.clients.consumer.ConsumerRecords;
29 import org.apache.kafka.clients.consumer.KafkaConsumer;
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
33 import org.onap.policy.apex.service.engine.event.ApexEventException;
34 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
35 import org.onap.policy.apex.service.engine.event.PeeredReference;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * This class implements an Apex event consumer that receives events using Kafka.
43  *
44  * @author Liam Fallon (liam.fallon@ericsson.com)
45  */
46 public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
47     // Get a reference to the logger
48     private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class);
49
50     // The Kafka parameters read from the parameter service
51     private KafkaCarrierTechnologyParameters kafkaConsumerProperties;
52
53     // The event receiver that will receive events from this consumer
54     private ApexEventReceiver eventReceiver;
55
56     // The Kafka consumer used to receive events using Kafka
57     private KafkaConsumer<String, String> kafkaConsumer;
58
59     // The name for this consumer
60     private String name = null;
61
62     // The peer references for this event handler
63     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
64
65     // The consumer thread and stopping flag
66     private Thread consumerThread;
67     private boolean stopOrderedFlag = false;
68
69     /**
70      * {@inheritDoc}.
71      */
72     @Override
73     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
74             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
75         this.eventReceiver = incomingEventReceiver;
76         this.name = consumerName;
77
78         // Check and get the Kafka Properties
79         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
80             LOGGER.warn("specified consumer properties of type \""
81                     + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
82                     + "\" are not applicable to a Kafka consumer");
83             throw new ApexEventException("specified consumer properties of type \""
84                     + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
85                     + "\" are not applicable to a Kafka consumer");
86         }
87         kafkaConsumerProperties =
88                 (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
89
90         // Kick off the Kafka consumer
91         kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
92         kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
93         if (LOGGER.isDebugEnabled()) {
94             LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
95                     + kafkaConsumerProperties.getConsumerTopicList());
96         }
97     }
98
99     /**
100      * {@inheritDoc}.
101      */
102     @Override
103     public void start() {
104         // Configure and start the event reception thread
105         final String threadName = this.getClass().getName() + ":" + this.name;
106         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
107         consumerThread.setDaemon(true);
108         consumerThread.start();
109     }
110
111     /**
112      * {@inheritDoc}.
113      */
114     @Override
115     public String getName() {
116         return name;
117     }
118
119     /**
120      * {@inheritDoc}.
121      */
122     @Override
123     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
124         return peerReferenceMap.get(peeredMode);
125     }
126
127     /**
128      * {@inheritDoc}.
129      */
130     @Override
131     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
132         peerReferenceMap.put(peeredMode, peeredReference);
133     }
134
135     /**
136      * {@inheritDoc}.
137      */
138     @Override
139     public void run() {
140         // Kick off the Kafka consumer
141         kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
142         kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
143         if (LOGGER.isDebugEnabled()) {
144             LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
145                     + kafkaConsumerProperties.getConsumerTopicList());
146         }
147
148         // The endless loop that receives events over Kafka
149         while (consumerThread.isAlive() && !stopOrderedFlag) {
150             try {
151                 final ConsumerRecords<String, String> records =
152                         kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
153                 for (final ConsumerRecord<String, String> record : records) {
154                     traceIfTraceEnabled(record);
155                     eventReceiver.receiveEvent(new Properties(), record.value());
156                 }
157             } catch (final Exception e) {
158                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
159             }
160         }
161
162         if (!consumerThread.isInterrupted()) {
163             kafkaConsumer.close();
164         }
165     }
166
167     /**
168      * Trace a record if trace is enabled.
169      *
170      * @param record the record to trace
171      */
172     private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
173         if (LOGGER.isTraceEnabled()) {
174             LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
175                     this.getClass().getName() + ":" + this.name, record.key(), record.value());
176         }
177     }
178
179     /**
180      * {@inheritDoc}.
181      */
182     @Override
183     public void stop() {
184         stopOrderedFlag = true;
185
186         while (consumerThread.isAlive()) {
187             ThreadUtilities.sleep(kafkaConsumerProperties.getConsumerPollTime());
188         }
189     }
190 }