2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.runBlocking
23 import org.apache.kafka.clients.consumer.Consumer
24 import org.apache.kafka.clients.consumer.ConsumerRecord
25 import org.apache.kafka.clients.consumer.KafkaConsumer
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
28 import java.time.Duration
29 import kotlin.concurrent.thread
31 open class KafkaMessageConsumerService(
32 private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
34 BlueprintMessageConsumerService {
36 val log = logger(KafkaMessageConsumerService::class)
37 val channel = Channel<ConsumerRecord<String, ByteArray>>()
38 var kafkaConsumer: Consumer<String, ByteArray>? = null
43 fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
44 val configProperties = messageConsumerProperties.getConfig()
45 /** add or override already set properties */
46 additionalConfig?.let { configProperties.putAll(it) }
47 /** Create Kafka consumer */
48 return KafkaConsumer(configProperties)
51 override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
52 /** get to topic names */
53 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
54 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
55 return subscribe(consumerTopic, additionalConfig)
58 override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
59 /** Create Kafka consumer */
60 kafkaConsumer = kafkaConsumer(additionalConfig)
62 checkNotNull(kafkaConsumer) {
63 "failed to create kafka consumer for " +
64 "server(${messageConsumerProperties.bootstrapServers})'s " +
65 "topics(${messageConsumerProperties.bootstrapServers})"
68 kafkaConsumer!!.subscribe(topics)
69 log.info("Successfully consumed topic($topics)")
71 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
73 kafkaConsumer!!.use { kc ->
75 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
76 log.trace("Consumed Records : ${consumerRecords.count()}")
78 consumerRecords?.forEach { consumerRecord ->
79 /** execute the command block */
80 if (!channel.isClosedForSend) {
81 channel.send(consumerRecord)
83 log.error("Channel is closed to receive message")
88 log.info("message listener shutting down.....")
94 override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
95 /** get to topic names */
96 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
97 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
98 return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
101 override suspend fun consume(
102 topics: List<String>,
103 additionalConfig: Map<String, Any>?,
104 consumerFunction: ConsumerFunction
107 val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
109 /** Create Kafka consumer */
110 kafkaConsumer = kafkaConsumer(additionalConfig)
112 checkNotNull(kafkaConsumer) {
113 "failed to create kafka consumer for " +
114 "server(${messageConsumerProperties.bootstrapServers})'s " +
115 "topics(${messageConsumerProperties.bootstrapServers})"
118 kafkaConsumer!!.subscribe(topics)
119 log.info("Successfully consumed topic($topics)")
121 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
123 kafkaConsumer!!.use { kc ->
125 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
126 log.trace("Consumed Records : ${consumerRecords.count()}")
128 /** Execute dynamic consumer Block substitution */
129 kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
132 log.info("message listener shutting down.....")
137 override suspend fun shutDown() {
138 /** stop the polling loop */
140 /** Close the Channel */
142 /** TO shutdown gracefully, need to wait for the maximum poll time */
143 delay(messageConsumerProperties.pollMillSec)