Make offset consumer more generic
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-kafka-consumer / src / test / kotlin / org / onap / dcae / collectors / veshv / kafkaconsumer / impl / OffsetKafkaConsumerTest.kt
index 26616f1..faa700b 100644 (file)
@@ -55,11 +55,11 @@ object OffsetKafkaConsumerTest : Spek({
             val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
 
             on("started OffsetKafkaConsumer") {
-                val topicPartition = createTopicPartition(topicName)
+                val topicPartition = createTopicPartition(topicName, 0)
                 val topicPartitions = setOf(topicPartition)
                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
-                        .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+                        .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset1))
 
                 runBlockingTest(testDispatcher) {
                     val job = offsetKafkaConsumer.start(updateIntervalInMs)
@@ -67,7 +67,35 @@ object OffsetKafkaConsumerTest : Spek({
                 }
 
                 it("should notify offset changed with $topicName") {
-                    verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition)
+                }
+            }
+        }
+
+        given("single topicName and multiple partitions") {
+            val topicName = "topicName"
+            val topics = setOf(topicName)
+            val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
+
+            on("started OffsetKafkaConsumer") {
+                val topicPartition1 = createTopicPartition(topicName, 0)
+                val topicPartition2 = createTopicPartition(topicName, 2)
+                val topicPartition3 = createTopicPartition(topicName, 3)
+                val topicPartitions = setOf(topicPartition1, topicPartition2, topicPartition3)
+                whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+                whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+                        .thenReturn(mapOf<TopicPartition, Long>(
+                                topicPartition1 to newOffset1, topicPartition2 to newOffset2, topicPartition3 to newOffset3))
+
+                runBlockingTest(testDispatcher) {
+                    val job = offsetKafkaConsumer.start(updateIntervalInMs)
+                    job.cancelAndJoin()
+                }
+
+                it("should notify offset changed with $topicName") {
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset1, topicPartition1)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset2, topicPartition2)
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset3, topicPartition3)
                 }
             }
         }
@@ -80,18 +108,18 @@ object OffsetKafkaConsumerTest : Spek({
                 val offsetArgumentCaptor = argumentCaptor<Long>()
                 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
 
-                val topicPartition1 = createTopicPartition(topicName1)
-                val topicPartition2 = createTopicPartition(topicName2)
+                val topicPartition1 = createTopicPartition(topicName1, 0)
+                val topicPartition2 = createTopicPartition(topicName2, 0)
                 val topicPartitions = setOf(topicPartition1, topicPartition2)
 
                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
 
                 val partitionToOffset1 =
-                        mapOf(topicPartition1 to newOffset,
-                                topicPartition2 to anotherNewOffset)
+                        mapOf(topicPartition1 to newOffset1,
+                                topicPartition2 to newOffset2)
                 val partitionToOffset2 =
-                        mapOf(topicPartition1 to anotherNewOffset,
-                                topicPartition2 to newOffset)
+                        mapOf(topicPartition1 to newOffset2,
+                                topicPartition2 to newOffset1)
                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
                         .thenReturn(partitionToOffset1, partitionToOffset2)
 
@@ -103,13 +131,13 @@ object OffsetKafkaConsumerTest : Spek({
                     )
 
                     it("should notify offset changed with proper arguments - before interval"){
-                        assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+                        assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset1)
                         assertThat(topicPartitionArgumentCaptor.firstValue.topic())
                                 .isEqualToIgnoringCase(topicPartition1.topic())
                         assertThat(topicPartitionArgumentCaptor.firstValue.partition())
                                 .isEqualTo(topicPartition1.partition())
 
-                        assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+                        assertThat(offsetArgumentCaptor.secondValue).isEqualTo(newOffset2)
                         assertThat(topicPartitionArgumentCaptor.secondValue.topic())
                                 .isEqualToIgnoringCase(topicPartition2.topic())
                         assertThat(topicPartitionArgumentCaptor.secondValue.partition())
@@ -126,13 +154,13 @@ object OffsetKafkaConsumerTest : Spek({
                     )
 
                     it("should notify offset changed with proper arguments - after interval") {
-                        assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset)
+                        assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(newOffset2)
                         assertThat(topicPartitionArgumentCaptor.thirdValue.topic())
                                 .isEqualToIgnoringCase(topicPartition1.topic())
                         assertThat(topicPartitionArgumentCaptor.thirdValue.partition())
                                 .isEqualTo(topicPartition1.partition())
 
-                        assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset)
+                        assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset1)
                         assertThat(topicPartitionArgumentCaptor.lastValue.topic())
                                 .isEqualToIgnoringCase(topicPartition2.topic())
                         assertThat(topicPartitionArgumentCaptor.lastValue.partition())
@@ -167,10 +195,10 @@ object OffsetKafkaConsumerTest : Spek({
 })
 
 private const val updateIntervalInMs = 10L
-private const val partitionNumber = 0
-private const val newOffset = 2L
-private const val anotherNewOffset = 10L
+private const val newOffset1 = 2L
+private const val newOffset2 = 10L
+private const val newOffset3 = 125L
 private const val topicName1 = "topicName1"
 private const val topicName2 = "topicName2"
 private const val topicsAmount = 2
-fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file
+fun createTopicPartition(topic: String, number: Int) = TopicPartition(topic, number)