Added Kafka metrics for CDS workers
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaMessageConsumerService.kt
index af689a1..004b476 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import io.micrometer.core.instrument.MeterRegistry
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
@@ -24,13 +25,16 @@ import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import java.time.Duration
 import kotlin.concurrent.thread
 
 open class KafkaMessageConsumerService(
-    private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
+    private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
+    private val meterRegistry: MeterRegistry
 ) :
     BlueprintMessageConsumerService {
 
@@ -78,6 +82,10 @@ open class KafkaMessageConsumerService(
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             launch {
+                                meterRegistry.counter(
+                                    BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
+                                    BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+                                ).increment()
                                 /** execute the command block */
                                 if (!channel.isClosedForSend) {
                                     channel.send(consumerRecord)
@@ -89,6 +97,10 @@ open class KafkaMessageConsumerService(
                                             "key(${consumerRecord.key()})"
                                     )
                                 } else {
+                                    meterRegistry.counter(
+                                        BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+                                        BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+                                    ).increment()
                                     log.error("Channel is closed to receive message")
                                 }
                             }