2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.delay
22 import kotlinx.coroutines.launch
23 import kotlinx.coroutines.runBlocking
24 import org.apache.kafka.clients.consumer.Consumer
25 import org.apache.kafka.clients.consumer.KafkaConsumer
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
28 import java.nio.charset.Charset
29 import java.time.Duration
30 import kotlin.concurrent.thread
32 open class KafkaMessageConsumerService(
33 private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
35 BlueprintMessageConsumerService {
37 val log = logger(KafkaMessageConsumerService::class)
38 val channel = Channel<String>()
39 var kafkaConsumer: Consumer<String, ByteArray>? = null
44 fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
45 val configProperties = messageConsumerProperties.getConfig()
46 /** add or override already set properties */
47 additionalConfig?.let { configProperties.putAll(it) }
48 /** Create Kafka consumer */
49 return KafkaConsumer(configProperties)
52 override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
53 /** get to topic names */
54 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
55 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
56 return subscribe(consumerTopic, additionalConfig)
59 override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
60 /** Create Kafka consumer */
61 kafkaConsumer = kafkaConsumer(additionalConfig)
63 checkNotNull(kafkaConsumer) {
64 "failed to create kafka consumer for " +
65 "server(${messageConsumerProperties.bootstrapServers})'s " +
66 "topics(${messageConsumerProperties.bootstrapServers})"
69 kafkaConsumer!!.subscribe(topics)
70 log.info("Successfully consumed topic($topics)")
72 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
74 kafkaConsumer!!.use { kc ->
76 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
77 log.trace("Consumed Records : ${consumerRecords.count()}")
79 consumerRecords?.forEach { consumerRecord ->
80 /** execute the command block */
81 consumerRecord.value()?.let {
83 if (!channel.isClosedForSend) {
84 channel.send(String(it, Charset.defaultCharset()))
86 log.error("Channel is closed to receive message")
93 log.info("message listener shutting down.....")
99 override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
100 /** get to topic names */
101 val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
102 check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
103 return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
106 override suspend fun consume(
107 topics: List<String>,
108 additionalConfig: Map<String, Any>?,
109 consumerFunction: ConsumerFunction
112 val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
114 /** Create Kafka consumer */
115 kafkaConsumer = kafkaConsumer(additionalConfig)
117 checkNotNull(kafkaConsumer) {
118 "failed to create kafka consumer for " +
119 "server(${messageConsumerProperties.bootstrapServers})'s " +
120 "topics(${messageConsumerProperties.bootstrapServers})"
123 kafkaConsumer!!.subscribe(topics)
124 log.info("Successfully consumed topic($topics)")
126 thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
128 kafkaConsumer!!.use { kc ->
130 val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
131 log.trace("Consumed Records : ${consumerRecords.count()}")
133 /** Execute dynamic consumer Block substitution */
134 kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
137 log.info("message listener shutting down.....")
142 override suspend fun shutDown() {
143 /** stop the polling loop */
145 /** Close the Channel */
147 /** TO shutdown gracefully, need to wait for the maximum poll time */
148 delay(messageConsumerProperties.pollMillSec)