2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
22 import com.nhaarman.mockitokotlin2.argumentCaptor
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.times
25 import com.nhaarman.mockitokotlin2.verify
26 import com.nhaarman.mockitokotlin2.verifyZeroInteractions
27 import com.nhaarman.mockitokotlin2.whenever
28 import kotlinx.coroutines.ExperimentalCoroutinesApi
29 import kotlinx.coroutines.cancelAndJoin
30 import kotlinx.coroutines.test.TestCoroutineDispatcher
31 import kotlinx.coroutines.test.runBlockingTest
32 import org.apache.kafka.clients.consumer.KafkaConsumer
33 import org.apache.kafka.common.TopicPartition
34 import org.assertj.core.api.Assertions.assertThat
35 import org.jetbrains.spek.api.Spek
36 import org.jetbrains.spek.api.dsl.given
37 import org.jetbrains.spek.api.dsl.it
38 import org.jetbrains.spek.api.dsl.on
39 import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
41 @ExperimentalCoroutinesApi
42 object KafkaSourceTest : Spek({
43 given("KafkaSource") {
44 val testDispatcher = TestCoroutineDispatcher()
45 val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
47 testDispatcher.cleanupTestCoroutines()
49 given("single topicName and partition") {
50 val topics = setOf("topicName")
51 val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
52 val mockedOffsetConsumer = mock<OffsetConsumer>()
53 on("started KafkaSource") {
54 val topicPartition = createTopicPartition("topicName")
55 val topicPartitions = setOf(topicPartition)
56 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
57 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
58 .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
61 val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
65 it("should call update function on topicName") {
66 verify(mockedOffsetConsumer).update(topicPartition, newOffset)
71 given("two topics with partition") {
72 val topics = setOf(topicName1, topicName2)
73 val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
74 val mockedOffsetConsumer = mock<OffsetConsumer>()
76 on("started KafkaSource for two iteration of while loop") {
77 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
78 val offsetArgumentCaptor = argumentCaptor<Long>()
79 val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
80 val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
81 val topicPartition1 = createTopicPartition(topicName1)
82 val topicPartition2 = createTopicPartition(topicName2)
83 val topicPartitions = setOf(topicPartition1, topicPartition2)
84 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
85 val partitionToOffset1 =
86 mapOf(topicPartition1 to newOffset,
87 topicPartition2 to anotherNewOffset)
88 val partitionToOffset2 =
89 mapOf(topicPartition1 to anotherNewOffset,
90 topicPartition2 to newOffset)
91 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
92 .thenReturn(partitionToOffset1, partitionToOffset2)
95 val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
96 verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
97 offsetArgumentCaptor.capture())
99 testDispatcher.advanceTimeBy(updateIntervalInMs)
101 verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
102 .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
104 it("should calls update function with proper arguments - before interval") {
105 assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
106 assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
107 assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
108 assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
110 it("should calls update function with proper arguments - after interval") {
111 assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
112 assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
113 assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
114 assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
121 given("empty topicName list") {
122 val emptyTopics = emptySet<String>()
123 val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
124 val mockedOffsetConsumer = mock<OffsetConsumer>()
126 on("call of function start") {
127 val emptyTopicPartitions = setOf(null)
128 whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
129 whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
130 .thenReturn(emptyMap())
133 val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
137 it("should not interact with OffsetConsumer") {
138 verifyZeroInteractions(mockedOffsetConsumer)
146 private const val updateIntervalInMs = 10L
147 private const val partitionNumber = 0
148 private const val newOffset = 2L
149 private const val anotherNewOffset = 10L
150 private const val topicName1 = "topicName1"
151 private const val topicName2 = "topicName2"
152 private const val topicsAmount = 2
153 private const val topicsAmountAfterInterval = 4
154 fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)