95379d457e9a45d1afd23fd988d6cada4703ad8e
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * SPDX-License-Identifier: Apache-2.0
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.policy.apex.plugins.event.carrier.kafka;
25
26 import java.util.Properties;
27 import org.apache.kafka.clients.consumer.ConsumerRecord;
28 import org.apache.kafka.clients.consumer.ConsumerRecords;
29 import org.apache.kafka.clients.consumer.KafkaConsumer;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.engine.event.ApexEventException;
32 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
33 import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
34 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This class implements an Apex event consumer that receives events using Kafka.
40  *
41  * @author Liam Fallon (liam.fallon@ericsson.com)
42  */
43 public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
44     // Get a reference to the logger
45     private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class);
46
47     // The Kafka parameters read from the parameter service
48     private KafkaCarrierTechnologyParameters kafkaConsumerProperties;
49
50     // The event receiver that will receive events from this consumer
51     private ApexEventReceiver eventReceiver;
52
53     /**
54      * {@inheritDoc}.
55      */
56     @Override
57     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
58         final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
59         this.eventReceiver = incomingEventReceiver;
60         this.name = consumerName;
61
62         // Check and get the Kafka Properties
63         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
64             throw new ApexEventException("specified consumer properties of type \""
65                 + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
66                 + "\" are not applicable to a Kafka consumer");
67         }
68         kafkaConsumerProperties =
69             (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
70     }
71
72     /**
73      * {@inheritDoc}.
74      */
75     @Override
76     public void run() {
77         // Kick off the Kafka consumer
78         try (KafkaConsumer<String, String> kafkaConsumer =
79             new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
80             kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
81             if (LOGGER.isDebugEnabled()) {
82                 LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
83                     kafkaConsumerProperties.getConsumerTopicList());
84             }
85
86             // The endless loop that receives events over Kafka
87             while (consumerThread.isAlive() && !stopOrderedFlag) {
88                 try {
89                     final ConsumerRecords<String, String> records =
90                         kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
91                     for (final ConsumerRecord<String, String> dataRecord : records) {
92                         traceIfTraceEnabled(dataRecord);
93                         eventReceiver.receiveEvent(new Properties(), dataRecord.value());
94                     }
95                 } catch (final Exception e) {
96                     LOGGER.debug("error receiving events on thread {}", consumerThread.getName(), e);
97                 }
98             }
99         }
100     }
101
102     /**
103      * Trace a record if trace is enabled.
104      *
105      * @param dataRecord the record to trace
106      */
107     private void traceIfTraceEnabled(final ConsumerRecord<String, String> dataRecord) {
108         if (LOGGER.isTraceEnabled()) {
109             LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
110                 this.getClass().getName() + ":" + this.name, dataRecord.key(), dataRecord.value());
111         }
112     }
113
114     /**
115      * {@inheritDoc}.
116      */
117     @Override
118     public void stop() {
119         stopOrderedFlag = true;
120
121         while (consumerThread.isAlive()) {
122             ThreadUtilities.sleep(kafkaConsumerProperties.getConsumerPollTime());
123         }
124     }
125 }