2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.apache.kafka.clients.consumer.Consumer
25 import org.apache.kafka.clients.consumer.ConsumerRecord
26 import org.apache.kafka.clients.consumer.KafkaConsumer
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import java.time.Duration
30 import kotlin.concurrent.thread
32 open class KafkaMessageConsumerService(
33 private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
35 BlueprintMessageConsumerService {
37 val log = logger(KafkaMessageConsumerService::class)
38 val channel = Channel<ConsumerRecord<String, ByteArray>>()
39 var kafkaConsumer: Consumer<String, ByteArray>? = null
44 fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
45 val configProperties = messageConsumerProperties.getConfig()
46 /** add or override already set properties */
47 additionalConfig?.let { configProperties.putAll(it) }
48 /** Create Kafka consumer */
49 return KafkaConsumer(configProperties)
52 override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
53 /** get to topic names */
54 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
55 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
56 return subscribe(consumerTopic, additionalConfig)
59 override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
60 /** Create Kafka consumer */
61 kafkaConsumer = kafkaConsumer(additionalConfig)
63 checkNotNull(kafkaConsumer) {
64 "failed to create kafka consumer for " +
65 "server(${messageConsumerProperties.bootstrapServers})'s " +
66 "topics(${messageConsumerProperties.bootstrapServers})"
69 kafkaConsumer!!.subscribe(topics)
70 log.info("Successfully consumed topic($topics)")
72 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
74 kafkaConsumer!!.use { kc ->
76 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
77 log.trace("Consumed Records : ${consumerRecords.count()}")
79 consumerRecords?.forEach { consumerRecord ->
81 /** execute the command block */
82 if (!channel.isClosedForSend) {
83 channel.send(consumerRecord)
85 log.error("Channel is closed to receive message")
91 log.info("message listener shutting down.....")
97 override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
98 /** get to topic names */
99 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
100 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
101 return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
104 override suspend fun consume(
105 topics: List<String>,
106 additionalConfig: Map<String, Any>?,
107 consumerFunction: ConsumerFunction
110 val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
112 /** Create Kafka consumer */
113 kafkaConsumer = kafkaConsumer(additionalConfig)
115 checkNotNull(kafkaConsumer) {
116 "failed to create kafka consumer for " +
117 "server(${messageConsumerProperties.bootstrapServers})'s " +
118 "topics(${messageConsumerProperties.bootstrapServers})"
121 kafkaConsumer!!.subscribe(topics)
122 log.info("Successfully consumed topic($topics)")
124 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
126 kafkaConsumer!!.use { kc ->
128 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
129 log.trace("Consumed Records : ${consumerRecords.count()}")
131 /** Execute dynamic consumer Block substitution */
132 kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
135 log.info("message listener shutting down.....")
140 override suspend fun shutDown() {
141 /** stop the polling loop */
143 /** Close the Channel */
145 /** TO shutdown gracefully, need to wait for the maximum poll time */
146 delay(messageConsumerProperties.pollMillSec)