Change handling of offset metric for consumer app 36/90836/1
authorkjaniak <kornel.janiak@nokia.com>
Wed, 3 Jul 2019 11:31:31 +0000 (13:31 +0200)
committerkjaniak <kornel.janiak@nokia.com>
Wed, 3 Jul 2019 11:31:31 +0000 (13:31 +0200)
Change-Id: Ia83469783d64d0174624489df671006e72bc8948
Issue-ID: DCAEGEN2-1635
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt

index e576a88..55fae45 100644 (file)
@@ -19,7 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
 
+import org.apache.kafka.common.TopicPartition
+
 interface Metrics {
-    fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
+    fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition)
     fun notifyMessageTravelTime(messageSentTimeMicros: Long)
 }
index da1225e..0af2cb2 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
 
+import arrow.syntax.function.memoize
+import io.micrometer.core.instrument.Tag
 import io.micrometer.core.instrument.Timer
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.apache.kafka.common.TopicPartition
 import org.onap.dcae.collectors.veshv.utils.TimeUtils
 import reactor.core.publisher.Mono
 import java.time.Duration
@@ -32,8 +35,11 @@ internal class MicrometerMetrics constructor(
         private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
 ) : Metrics {
 
-    private val currentOffset = registry.gauge(name("consumer.offset"), AtomicLong(0))
-    private val travelTime = Timer.builder(name("travel.time"))
+    private val currentOffsetByTopicPartition = { topic: String ->
+        registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+    }.memoize<String, AtomicLong>()
+
+    private val travelTime = Timer.builder(name(TRAVEL,TIME))
             .publishPercentileHistogram(true)
             .register(registry)
 
@@ -41,9 +47,8 @@ internal class MicrometerMetrics constructor(
         registry.scrape()
     }
 
-    override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
-        // TODO use topic and partition
-        currentOffset.lazySet(offset)
+    override fun notifyOffsetChanged(offset: Long, topicPartition: TopicPartition) {
+        currentOffsetByTopicPartition(topicPartition.toString()).set(offset)
     }
 
     override fun notifyMessageTravelTime(messageSentTimeMicros: Long) {
@@ -51,8 +56,13 @@ internal class MicrometerMetrics constructor(
     }
 
     companion object {
-        val INSTANCE by lazy { MicrometerMetrics() }
 
+        val INSTANCE by lazy { MicrometerMetrics() }
+        private const val CONSUMER = "consumer"
+        private const val OFFSET = "offset"
+        private const val TOPIC = "topic"
+        private const val TRAVEL = "travel"
+        private const val TIME = "time"
         private const val PREFIX = "hv-kafka-consumer"
         private fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
     }
index 1481a22..57a5f33 100644 (file)
@@ -28,14 +28,11 @@ internal class OffsetConsumer(private val metrics: Metrics) {
 
     fun update(topicPartition: TopicPartition, offset: Long) {
         logger.trace {
-            "Current consumer offset $offset for topic ${topicPartition.topic()} " +
-                    "on partition ${topicPartition.partition()}"
+            "Current consumer offset $offset for topic partition $topicPartition"
         }
-        metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
+        metrics.notifyOffsetChanged(offset, topicPartition)
     }
 
-    fun reset() = Unit
-
     companion object {
         val logger = Logger(OffsetConsumer::class)
     }
index 96ba588..93a39ae 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
 
 import io.micrometer.prometheus.PrometheusConfig
 import io.micrometer.prometheus.PrometheusMeterRegistry
+import org.apache.kafka.common.TopicPartition
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.data.Percentage
 import org.jetbrains.spek.api.Spek
@@ -68,13 +69,14 @@ object MicrometerMetricsTest : Spek({
     }
 
     describe("Gauges") {
-        val gaugeName = "$PREFIX.consumer.offset"
+        val gaugeName = "$PREFIX.consumer.offset.topic"
 
         on("notifyOffsetChanged") {
             val offset = 966L
+            val topicPartition = TopicPartition("sample_topic", 1)
 
             it("should update $gaugeName") {
-                cut.notifyOffsetChanged(offset, "sample_topic", 1)
+                cut.notifyOffsetChanged(offset, topicPartition)
 
                 registry.verifyGauge(gaugeName) {
                     assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
index 242f27b..5ccb483 100644 (file)
@@ -40,7 +40,7 @@ object OffsetConsumerTest : Spek({
                 offsetConsumer.update(topicPartition, newOffset)
 
                 it("should notify message newOffset metric") {
-                    verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
                 }
             }
         }