Make offset consumer more generic 48/106748/3
authorJoanna Jeremicz <joanna.jeremicz@nokia.com>
Tue, 28 Apr 2020 11:25:46 +0000 (13:25 +0200)
committerJoanna Jeremicz <joanna.jeremicz@nokia.com>
Thu, 30 Apr 2020 11:16:40 +0000 (13:16 +0200)
Offsets are now correctly read despite of partitions number on topic.

Issue-ID: DCAEGEN2-1576
Change-Id: Id89bebbcf1b6181a2f9fd5a4a4600329d046c5e5
Signed-off-by: Joanna Jeremicz <joanna.jeremicz@nokia.com>
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/CommandLineOption.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/config/ArgKafkaConsumerConfigurationTest.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt

index 52bcf1e..1899915 100644 (file)
@@ -41,9 +41,10 @@ internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<Byte
 
     override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
             GlobalScope.launch(dispatcher) {
-                val topicPartitions = topics.flatMap {
-                    listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
-                }
+
+                val topicPartitions = topics.flatMap(kafkaConsumer::partitionsFor)
+                        .map { TopicPartition(it.topic(), it.partition()) }
+
                 kafkaConsumer.assign(topicPartitions)
 
                 while (isActive) {
index 26616f1..faa700b 100644 (file)
@@ -55,11 +55,11 @@ object OffsetKafkaConsumerTest : Spek({
             val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
 
             on("started OffsetKafkaConsumer") {
-                val topicPartition = createTopicPartition(topicName)
+                val topicPartition = createTopicPartition(topicName, 0)
                 val topicPartitions = setOf(topicPartition)
                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
-                        .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+                        .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset1))
 
                 runBlockingTest(testDispatcher) {
                     val job = offsetKafkaConsumer.start(updateIntervalInMs)
@@ -67,7 +67,35 @@ object OffsetKafkaConsumerTest : Spek({
                 }
 
                 it("should notify offset changed with $topicName") {
-                    verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition)
+                }
+            }
+        }
+
+        given("single topicName and multiple partitions") {
+            val topicName = "topicName"
+            val topics = setOf(topicName)
+            val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
+
+            on("started OffsetKafkaConsumer") {
+                val topicPartition1 = createTopicPartition(topicName, 0)
+                val topicPartition2 = createTopicPartition(topicName, 2)
+                val topicPartition3 = createTopicPartition(topicName, 3)
+                val topicPartitions = setOf(topicPartition1, topicPartition2, topicPartition3)
+                whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+                whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+                        .thenReturn(mapOf<TopicPartition, Long>(
+                                topicPartition1 to newOffset1, topicPartition2 to newOffset2, topicPartition3 to newOffset3))
+
+                runBlockingTest(testDispatcher) {
+                    val job = offsetKafkaConsumer.start(updateIntervalInMs)
+                    job.cancelAndJoin()
+                }
+
+                it("should notify offset changed with $topicName") {
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition1)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset2, topicPartition2)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset3, topicPartition3)
                 }
             }
         }
@@ -80,18 +108,18 @@ object OffsetKafkaConsumerTest : Spek({
                 val offsetArgumentCaptor = argumentCaptor<Long>()
                 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
 
-                val topicPartition1 = createTopicPartition(topicName1)
-                val topicPartition2 = createTopicPartition(topicName2)
+                val topicPartition1 = createTopicPartition(topicName1, 0)
+                val topicPartition2 = createTopicPartition(topicName2, 0)
                 val topicPartitions = setOf(topicPartition1, topicPartition2)
 
                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
 
                 val partitionToOffset1 =
-                        mapOf(topicPartition1 to newOffset,
-                                topicPartition2 to anotherNewOffset)
+                        mapOf(topicPartition1 to newOffset1,
+                                topicPartition2 to newOffset2)
                 val partitionToOffset2 =
-                        mapOf(topicPartition1 to anotherNewOffset,
-                                topicPartition2 to newOffset)
+                        mapOf(topicPartition1 to newOffset2,
+                                topicPartition2 to newOffset1)
                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
                         .thenReturn(partitionToOffset1, partitionToOffset2)
 
@@ -103,13 +131,13 @@ object OffsetKafkaConsumerTest : Spek({
                     )
 
                     it("should notify offset changed with proper arguments - before interval"){
-                        assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+                        assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset1)
                         assertThat(topicPartitionArgumentCaptor.firstValue.topic())
                                 .isEqualToIgnoringCase(topicPartition1.topic())
                         assertThat(topicPartitionArgumentCaptor.firstValue.partition())
                                 .isEqualTo(topicPartition1.partition())
 
-                        assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+                        assertThat(offsetArgumentCaptor.secondValue).isEqualTo(newOffset2)
                         assertThat(topicPartitionArgumentCaptor.secondValue.topic())
                                 .isEqualToIgnoringCase(topicPartition2.topic())
                         assertThat(topicPartitionArgumentCaptor.secondValue.partition())
@@ -126,13 +154,13 @@ object OffsetKafkaConsumerTest : Spek({
                     )
 
                     it("should notify offset changed with proper arguments - after interval") {
-                        assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset)
+                        assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(newOffset2)
                         assertThat(topicPartitionArgumentCaptor.thirdValue.topic())
                                 .isEqualToIgnoringCase(topicPartition1.topic())
                         assertThat(topicPartitionArgumentCaptor.thirdValue.partition())
                                 .isEqualTo(topicPartition1.partition())
 
-                        assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset)
+                        assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset1)
                         assertThat(topicPartitionArgumentCaptor.lastValue.topic())
                                 .isEqualToIgnoringCase(topicPartition2.topic())
                         assertThat(topicPartitionArgumentCaptor.lastValue.partition())
@@ -167,10 +195,10 @@ object OffsetKafkaConsumerTest : Spek({
 })
 
 private const val updateIntervalInMs = 10L
-private const val partitionNumber = 0
-private const val newOffset = 2L
-private const val anotherNewOffset = 10L
+private const val newOffset1 = 2L
+private const val newOffset2 = 10L
+private const val newOffset3 = 125L
 private const val topicName1 = "topicName1"
 private const val topicName2 = "topicName2"
 private const val topicsAmount = 2
-fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file
+fun createTopicPartition(topic: String, number: Int) = TopicPartition(topic, number)