2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.apache.kafka.clients.consumer.Consumer
25 import org.apache.kafka.clients.consumer.ConsumerRecord
26 import org.apache.kafka.clients.consumer.KafkaConsumer
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import java.time.Duration
30 import kotlin.concurrent.thread
32 open class KafkaMessageConsumerService(
33 private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
35 BlueprintMessageConsumerService {
37 val log = logger(KafkaMessageConsumerService::class)
38 val channel = Channel<ConsumerRecord<String, ByteArray>>()
39 var kafkaConsumer: Consumer<String, ByteArray>? = null
44 fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
45 val configProperties = messageConsumerProperties.getConfig()
46 /** add or override already set properties */
47 additionalConfig?.let { configProperties.putAll(it) }
48 /** Create Kafka consumer */
49 return KafkaConsumer(configProperties)
52 override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
53 /** get to topic names */
54 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
55 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
56 return subscribe(consumerTopic, additionalConfig)
59 override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
60 /** Create Kafka consumer */
61 kafkaConsumer = kafkaConsumer(additionalConfig)
63 checkNotNull(kafkaConsumer) {
64 "failed to create kafka consumer for " +
65 "server(${messageConsumerProperties.bootstrapServers})'s " +
66 "topics(${messageConsumerProperties.bootstrapServers})"
69 kafkaConsumer!!.subscribe(topics)
70 log.info("Successfully consumed topic($topics)")
72 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
74 kafkaConsumer!!.use { kc ->
76 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
77 log.trace("Consumed Records : ${consumerRecords.count()}")
79 consumerRecords?.forEach { consumerRecord ->
81 /** execute the command block */
82 if (!channel.isClosedForSend) {
83 channel.send(consumerRecord)
85 "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
86 "partition(${consumerRecord.partition()}) " +
87 "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
88 "offset(${consumerRecord.offset()}) " +
89 "key(${consumerRecord.key()})"
92 log.error("Channel is closed to receive message")
98 log.info("message listener shutting down.....")
104 override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
105 /** get to topic names */
106 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
107 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
108 return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
111 override suspend fun consume(
112 topics: List<String>,
113 additionalConfig: Map<String, Any>?,
114 consumerFunction: ConsumerFunction
117 val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
119 /** Create Kafka consumer */
120 kafkaConsumer = kafkaConsumer(additionalConfig)
122 checkNotNull(kafkaConsumer) {
123 "failed to create kafka consumer for " +
124 "server(${messageConsumerProperties.bootstrapServers})'s " +
125 "topics(${messageConsumerProperties.bootstrapServers})"
128 kafkaConsumer!!.subscribe(topics)
129 log.info("Successfully consumed topic($topics)")
131 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
133 kafkaConsumer!!.use { kc ->
135 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
136 log.trace("Consumed Records : ${consumerRecords.count()}")
138 /** Execute dynamic consumer Block substitution */
139 kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
142 log.info("message listener shutting down.....")
147 override suspend fun shutDown() {
148 /** stop the polling loop */
150 /** Close the Channel */
152 /** TO shutdown gracefully, need to wait for the maximum poll time */
153 delay(messageConsumerProperties.pollMillSec)