2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.channels.Channel
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.consumer.Consumer
26 import org.apache.kafka.clients.consumer.ConsumerRecord
27 import org.apache.kafka.clients.consumer.KafkaConsumer
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.logger
32 import java.time.Duration
33 import kotlin.concurrent.thread
35 open class KafkaMessageConsumerService(
36 private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
37 private val meterRegistry: MeterRegistry
39 BlueprintMessageConsumerService {
41 val log = logger(KafkaMessageConsumerService::class)
42 val channel = Channel<ConsumerRecord<String, ByteArray>>()
43 var kafkaConsumer: Consumer<String, ByteArray>? = null
48 fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
49 val configProperties = messageConsumerProperties.getConfig()
50 /** add or override already set properties */
51 additionalConfig?.let { configProperties.putAll(it) }
52 /** Create Kafka consumer */
53 return KafkaConsumer(configProperties)
56 override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
57 /** get to topic names */
58 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
59 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
60 return subscribe(consumerTopic, additionalConfig)
63 override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
64 /** Create Kafka consumer */
65 kafkaConsumer = kafkaConsumer(additionalConfig)
67 checkNotNull(kafkaConsumer) {
68 "failed to create kafka consumer for " +
69 "server(${messageConsumerProperties.bootstrapServers})'s " +
70 "topics(${messageConsumerProperties.bootstrapServers})"
73 kafkaConsumer!!.subscribe(topics)
74 log.info("Successfully consumed topic($topics)")
76 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
78 kafkaConsumer!!.use { kc ->
80 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
81 log.trace("Consumed Records : ${consumerRecords.count()}")
83 consumerRecords?.forEach { consumerRecord ->
85 meterRegistry.counter(
86 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
87 BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
89 /** execute the command block */
90 if (!channel.isClosedForSend) {
91 channel.send(consumerRecord)
93 "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
94 "partition(${consumerRecord.partition()}) " +
95 "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
96 "offset(${consumerRecord.offset()}) " +
97 "key(${consumerRecord.key()})"
100 meterRegistry.counter(
101 BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
102 BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
104 log.error("Channel is closed to receive message")
110 log.info("message listener shutting down.....")
116 override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
117 /** get to topic names */
118 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
119 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
120 return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
123 override suspend fun consume(
124 topics: List<String>,
125 additionalConfig: Map<String, Any>?,
126 consumerFunction: ConsumerFunction
129 val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
131 /** Create Kafka consumer */
132 kafkaConsumer = kafkaConsumer(additionalConfig)
134 checkNotNull(kafkaConsumer) {
135 "failed to create kafka consumer for " +
136 "server(${messageConsumerProperties.bootstrapServers})'s " +
137 "topics(${messageConsumerProperties.bootstrapServers})"
140 kafkaConsumer!!.subscribe(topics)
141 log.info("Successfully consumed topic($topics)")
143 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
145 kafkaConsumer!!.use { kc ->
147 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
148 log.trace("Consumed Records : ${consumerRecords.count()}")
150 /** Execute dynamic consumer Block substitution */
151 kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
154 log.info("message listener shutting down.....")
159 override suspend fun shutDown() {
160 /** stop the polling loop */
162 /** Close the Channel */
164 /** TO shutdown gracefully, need to wait for the maximum poll time */
165 delay(messageConsumerProperties.pollMillSec)