Added Kafka metrics for CDS workers
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaMessageConsumerService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
19
20 import io.micrometer.core.instrument.MeterRegistry
21 import kotlinx.coroutines.channels.Channel
22 import kotlinx.coroutines.delay
23 import kotlinx.coroutines.launch
24 import kotlinx.coroutines.runBlocking
25 import org.apache.kafka.clients.consumer.Consumer
26 import org.apache.kafka.clients.consumer.ConsumerRecord
27 import org.apache.kafka.clients.consumer.KafkaConsumer
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
29 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.logger
32 import java.time.Duration
33 import kotlin.concurrent.thread
34
35 open class KafkaMessageConsumerService(
36     private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
37     private val meterRegistry: MeterRegistry
38 ) :
39     BlueprintMessageConsumerService {
40
41     val log = logger(KafkaMessageConsumerService::class)
42     val channel = Channel<ConsumerRecord<String, ByteArray>>()
43     var kafkaConsumer: Consumer<String, ByteArray>? = null
44
45     @Volatile
46     var keepGoing = true
47
48     fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
49         val configProperties = messageConsumerProperties.getConfig()
50         /** add or override already set properties */
51         additionalConfig?.let { configProperties.putAll(it) }
52         /** Create Kafka consumer */
53         return KafkaConsumer(configProperties)
54     }
55
56     override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
57         /** get to topic names */
58         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
59         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
60         return subscribe(consumerTopic, additionalConfig)
61     }
62
63     override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
64         /** Create Kafka consumer */
65         kafkaConsumer = kafkaConsumer(additionalConfig)
66
67         checkNotNull(kafkaConsumer) {
68             "failed to create kafka consumer for " +
69                 "server(${messageConsumerProperties.bootstrapServers})'s " +
70                 "topics(${messageConsumerProperties.bootstrapServers})"
71         }
72
73         kafkaConsumer!!.subscribe(topics)
74         log.info("Successfully consumed topic($topics)")
75
76         thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
77             keepGoing = true
78             kafkaConsumer!!.use { kc ->
79                 while (keepGoing) {
80                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
81                     log.trace("Consumed Records : ${consumerRecords.count()}")
82                     runBlocking {
83                         consumerRecords?.forEach { consumerRecord ->
84                             launch {
85                                 meterRegistry.counter(
86                                     BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
87                                     BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
88                                 ).increment()
89                                 /** execute the command block */
90                                 if (!channel.isClosedForSend) {
91                                     channel.send(consumerRecord)
92                                     log.info(
93                                         "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
94                                             "partition(${consumerRecord.partition()}) " +
95                                             "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
96                                             "offset(${consumerRecord.offset()}) " +
97                                             "key(${consumerRecord.key()})"
98                                     )
99                                 } else {
100                                     meterRegistry.counter(
101                                         BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
102                                         BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
103                                     ).increment()
104                                     log.error("Channel is closed to receive message")
105                                 }
106                             }
107                         }
108                     }
109                 }
110                 log.info("message listener shutting down.....")
111             }
112         }
113         return channel
114     }
115
116     override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
117         /** get to topic names */
118         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
119         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
120         return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
121     }
122
123     override suspend fun consume(
124         topics: List<String>,
125         additionalConfig: Map<String, Any>?,
126         consumerFunction: ConsumerFunction
127     ) {
128
129         val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
130
131         /** Create Kafka consumer */
132         kafkaConsumer = kafkaConsumer(additionalConfig)
133
134         checkNotNull(kafkaConsumer) {
135             "failed to create kafka consumer for " +
136                 "server(${messageConsumerProperties.bootstrapServers})'s " +
137                 "topics(${messageConsumerProperties.bootstrapServers})"
138         }
139
140         kafkaConsumer!!.subscribe(topics)
141         log.info("Successfully consumed topic($topics)")
142
143         thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
144             keepGoing = true
145             kafkaConsumer!!.use { kc ->
146                 while (keepGoing) {
147                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
148                     log.trace("Consumed Records : ${consumerRecords.count()}")
149                     runBlocking {
150                         /** Execute dynamic consumer Block substitution */
151                         kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
152                     }
153                 }
154                 log.info("message listener shutting down.....")
155             }
156         }
157     }
158
159     override suspend fun shutDown() {
160         /** stop the polling loop */
161         keepGoing = false
162         /** Close the Channel */
163         channel.cancel()
164         /** TO shutdown gracefully, need to wait for the maximum poll time */
165         delay(messageConsumerProperties.pollMillSec)
166     }
167 }