2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
22 import com.nhaarman.mockitokotlin2.*
23 import kotlinx.coroutines.CoroutineScope
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.ExperimentalCoroutinesApi
26 import kotlinx.coroutines.cancelAndJoin
27 import kotlinx.coroutines.test.TestCoroutineDispatcher
28 import kotlinx.coroutines.test.resetMain
29 import kotlinx.coroutines.test.runBlockingTest
30 import kotlinx.coroutines.test.setMain
31 import org.apache.kafka.clients.consumer.KafkaConsumer
32 import org.apache.kafka.common.TopicPartition
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.given
36 import org.jetbrains.spek.api.dsl.it
37 import org.jetbrains.spek.api.dsl.on
38 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
40 @ExperimentalCoroutinesApi
41 object OffsetKafkaConsumerTest : Spek({
42 given("OffsetKafkaConsumer") {
43 val testDispatcher = TestCoroutineDispatcher()
44 val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
45 val mockedMetrics = mock<Metrics>()
48 testDispatcher.cleanupTestCoroutines()
52 given("single topicName and partition") {
53 val topicName = "topicName"
54 val topics = setOf(topicName)
55 val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
57 on("started OffsetKafkaConsumer") {
58 val topicPartition = createTopicPartition(topicName)
59 val topicPartitions = setOf(topicPartition)
60 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
61 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
62 .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
64 runBlockingTest(testDispatcher) {
65 val job = offsetKafkaConsumer.start(updateIntervalInMs)
69 it("should notify offset changed with $topicName") {
70 verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
75 given("two topics with partition") {
76 val topics = setOf(topicName1, topicName2)
77 val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
79 on("started OffsetKafkaConsumer for two iteration of while loop") {
80 val offsetArgumentCaptor = argumentCaptor<Long>()
81 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
83 val topicPartition1 = createTopicPartition(topicName1)
84 val topicPartition2 = createTopicPartition(topicName2)
85 val topicPartitions = setOf(topicPartition1, topicPartition2)
87 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
89 val partitionToOffset1 =
90 mapOf(topicPartition1 to newOffset,
91 topicPartition2 to anotherNewOffset)
92 val partitionToOffset2 =
93 mapOf(topicPartition1 to anotherNewOffset,
94 topicPartition2 to newOffset)
95 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
96 .thenReturn(partitionToOffset1, partitionToOffset2)
98 runBlockingTest(testDispatcher) {
99 val job = kafkaSource.start(updateIntervalInMs)
100 verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged(
101 offsetArgumentCaptor.capture(),
102 topicPartitionArgumentCaptor.capture()
105 it("should notify offset changed with proper arguments - before interval"){
106 assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
107 assertThat(topicPartitionArgumentCaptor.firstValue.topic())
108 .isEqualToIgnoringCase(topicPartition1.topic())
109 assertThat(topicPartitionArgumentCaptor.firstValue.partition())
110 .isEqualTo(topicPartition1.partition())
112 assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
113 assertThat(topicPartitionArgumentCaptor.secondValue.topic())
114 .isEqualToIgnoringCase(topicPartition2.topic())
115 assertThat(topicPartitionArgumentCaptor.secondValue.partition())
116 .isEqualTo(topicPartition2.partition())
119 advanceTimeBy(updateIntervalInMs)
123 verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged(
124 offsetArgumentCaptor.capture(),
125 topicPartitionArgumentCaptor.capture()
128 it("should notify offset changed with proper arguments - after interval") {
129 assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset)
130 assertThat(topicPartitionArgumentCaptor.thirdValue.topic())
131 .isEqualToIgnoringCase(topicPartition1.topic())
132 assertThat(topicPartitionArgumentCaptor.thirdValue.partition())
133 .isEqualTo(topicPartition1.partition())
135 assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset)
136 assertThat(topicPartitionArgumentCaptor.lastValue.topic())
137 .isEqualToIgnoringCase(topicPartition2.topic())
138 assertThat(topicPartitionArgumentCaptor.lastValue.partition())
139 .isEqualTo(topicPartition2.partition())
145 given("empty topicName list") {
146 val emptyTopics = emptySet<String>()
147 val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher)
149 on("call of function start") {
150 val emptyTopicPartitions = setOf(null)
151 whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
152 whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
153 .thenReturn(emptyMap())
155 runBlockingTest(testDispatcher) {
156 val job = offsetKafkaConsumer.start(updateIntervalInMs)
160 it("should not interact with metrics") {
161 verifyZeroInteractions(mockedMetrics)
169 private const val updateIntervalInMs = 10L
170 private const val partitionNumber = 0
171 private const val newOffset = 2L
172 private const val anotherNewOffset = 10L
173 private const val topicName1 = "topicName1"
174 private const val topicName2 = "topicName2"
175 private const val topicsAmount = 2
176 fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)