Add partition offset metric to each topic partition 68/96168/3
authorkjaniak <kornel.janiak@nokia.com>
Tue, 24 Sep 2019 07:46:24 +0000 (09:46 +0200)
committerkjaniak <kornel.janiak@nokia.com>
Thu, 26 Sep 2019 12:34:14 +0000 (14:34 +0200)
Before this commit offset consumer was able to fetch offset just from one partition.
This commit solve this.

Change-Id: I2c2c300219e43ab422b237094ad775ca8795169e
Issue-ID: DCAEGEN2-1783
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
tools/performance/cloud/cloud-based-performance-test.sh
tools/performance/cloud/test.properties
tools/performance/local/grafana/dashboards/performance_tests.json

index 18de6fc..52bcf1e 100644 (file)
@@ -37,21 +37,23 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
                                    private val topics: Set<String>,
                                    private val metrics: Metrics,
                                    private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
-    : MetricsKafkaConsumer{
+    : MetricsKafkaConsumer {
 
-    override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
+    override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
             GlobalScope.launch(dispatcher) {
-                kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
-                    while (isActive) {
-                        val topicPartitions = kafkaConsumer.assignment()
+                val topicPartitions = topics.flatMap {
+                    listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
+                }
+                kafkaConsumer.assign(topicPartitions)
 
-                        kafkaConsumer.endOffsets(topicPartitions)
-                                .forEach { (topicPartition, offset) ->
-                                    update(topicPartition, offset)
-                                }
-                        kafkaConsumer.commitSync()
-                        delay(updateInterval)
-                    }
+                while (isActive) {
+                    kafkaConsumer.endOffsets(kafkaConsumer.assignment())
+                            .forEach { (topicPartition, offset) ->
+                                update(topicPartition, offset)
+                            }
+                    kafkaConsumer.commitSync()
+                    delay(updateInterval)
+                }
             }
 
     private fun update(topicPartition: TopicPartition, offset: Long) {
index 0af2cb2..da6a467 100644 (file)
@@ -35,8 +35,10 @@ internal class MicrometerMetrics constructor(
         private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
 ) : Metrics {
 
-    private val currentOffsetByTopicPartition = { topic: String ->
-        registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+    private val currentOffsetByTopicPartition = { topicPartition: String ->
+        registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()),
+                listOf(Tag.of(PARTITION, topicPartition)),
+                AtomicLong(0))
     }.memoize<String, AtomicLong>()
 
     private val travelTime = Timer.builder(name(TRAVEL,TIME))
@@ -58,9 +60,8 @@ internal class MicrometerMetrics constructor(
     companion object {
 
         val INSTANCE by lazy { MicrometerMetrics() }
-        private const val CONSUMER = "consumer"
         private const val OFFSET = "offset"
-        private const val TOPIC = "topic"
+        private const val PARTITION = "partition"
         private const val TRAVEL = "travel"
         private const val TIME = "time"
         private const val PREFIX = "hv-kafka-consumer"
index 3bb9eba..26616f1 100644 (file)
@@ -72,7 +72,7 @@ object OffsetKafkaConsumerTest : Spek({
             }
         }
 
-        given("two topics with partition") {
+        given("two topics with one partition each") {
             val topics = setOf(topicName1, topicName2)
             val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
 
@@ -147,7 +147,7 @@ object OffsetKafkaConsumerTest : Spek({
             val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher)
 
             on("call of function start") {
-                val emptyTopicPartitions = setOf(null)
+                val emptyTopicPartitions = emptySet<TopicPartition>()
                 whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
                 whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
                         .thenReturn(emptyMap())
index 93a39ae..cfe67df 100644 (file)
@@ -69,17 +69,34 @@ object MicrometerMetricsTest : Spek({
     }
 
     describe("Gauges") {
-        val gaugeName = "$PREFIX.consumer.offset.topic"
+        val gaugeName1 = "$PREFIX.offset.partition.sample_topic-0"
+        val gaugeName2 = "$PREFIX.offset.partition.sample_topic-1"
+        val offset1 = 966L
+        val offset2 = 967L
+        val topicPartition1 = TopicPartition("sample_topic", 0)
+        val topicPartition2 = TopicPartition("sample_topic", 1)
 
         on("notifyOffsetChanged") {
-            val offset = 966L
-            val topicPartition = TopicPartition("sample_topic", 1)
+            it("should update $gaugeName1") {
+                cut.notifyOffsetChanged(offset1, topicPartition1)
 
-            it("should update $gaugeName") {
-                cut.notifyOffsetChanged(offset, topicPartition)
+                registry.verifyGauge(gaugeName1) {
+                    assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+                }
+            }
+        }
+
+        on("two partition update") {
+            it("should update $gaugeName1") {
+                cut.notifyOffsetChanged(offset1, topicPartition1)
+                cut.notifyOffsetChanged(offset2, topicPartition2)
+
+                registry.verifyGauge(gaugeName1) {
+                    assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+                }
 
-                registry.verifyGauge(gaugeName) {
-                    assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+                registry.verifyGauge(gaugeName2) {
+                    assertThat(it.value()).isCloseTo(offset2.toDouble(), doublePrecision)
                 }
             }
         }
index cc08f4b..300bf20 100755 (executable)
@@ -77,7 +77,7 @@ function usage() {
     echo "./cloud-based-performance-test.sh setup"
     echo "./cloud-based-performance-test.sh start"
     echo "./cloud-based-performance-test.sh start --containers 10"
-    echo "./cloud-based-performance-test.sh start --containers 10"
+    echo "./cloud-based-performance-test.sh start --properties-file ~/other_test.properties"
     echo "./cloud-based-performance-test.sh clean"
     exit 1
 }
index bd746a1..04169e3 100644 (file)
@@ -14,6 +14,6 @@ producer.message.interval=0
 # CONSUMER CONFIGURATION
 
 # Addresses of Kafka services to consume from
-consumer.kafka.bootstrapServers=message-router-kafka-0:9093,message-router-kafka-1:9093,message-router-kafka-2:9093
+consumer.kafka.bootstrapServers=message-router-kafka:9092
 # Kafka topics to subscribe to
 consumer.kafka.topics=HV_VES_PERF3GPP
index 3784a96..654fdc3 100644 (file)
       "tableColumn": "",
       "targets": [
         {
-          "expr": "hv_kafka_consumer_consumer_offset_topic",
+          "expr": "hv_kafka_consumer_offset_partition_hv_ves_perf3gpp_0",
           "format": "time_series",
           "intervalFactor": 1,
           "refId": "A"
       "thresholds": "",
       "timeFrom": null,
       "timeShift": null,
-      "title": "Current offset",
+      "title": "Current offset on partition 0",
       "type": "singlestat",
       "valueFontSize": "80%",
       "valueMaps": [