3bb9ebaef7e10e4a312a8b101d188ef782e72483
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
21
22 import com.nhaarman.mockitokotlin2.*
23 import kotlinx.coroutines.CoroutineScope
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.ExperimentalCoroutinesApi
26 import kotlinx.coroutines.cancelAndJoin
27 import kotlinx.coroutines.test.TestCoroutineDispatcher
28 import kotlinx.coroutines.test.resetMain
29 import kotlinx.coroutines.test.runBlockingTest
30 import kotlinx.coroutines.test.setMain
31 import org.apache.kafka.clients.consumer.KafkaConsumer
32 import org.apache.kafka.common.TopicPartition
33 import org.assertj.core.api.Assertions.assertThat
34 import org.jetbrains.spek.api.Spek
35 import org.jetbrains.spek.api.dsl.given
36 import org.jetbrains.spek.api.dsl.it
37 import org.jetbrains.spek.api.dsl.on
38 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
39
40 @ExperimentalCoroutinesApi
41 object OffsetKafkaConsumerTest : Spek({
42     given("OffsetKafkaConsumer") {
43         val testDispatcher = TestCoroutineDispatcher()
44         val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
45         val mockedMetrics = mock<Metrics>()
46
47         afterEachTest {
48             testDispatcher.cleanupTestCoroutines()
49             reset(mockedMetrics)
50         }
51
52         given("single topicName and partition") {
53             val topicName = "topicName"
54             val topics = setOf(topicName)
55             val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
56
57             on("started OffsetKafkaConsumer") {
58                 val topicPartition = createTopicPartition(topicName)
59                 val topicPartitions = setOf(topicPartition)
60                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
61                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
62                         .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
63
64                 runBlockingTest(testDispatcher) {
65                     val job = offsetKafkaConsumer.start(updateIntervalInMs)
66                     job.cancelAndJoin()
67                 }
68
69                 it("should notify offset changed with $topicName") {
70                     verify(mockedMetrics).notifyOffsetChanged(newOffset, topicPartition)
71                 }
72             }
73         }
74
75         given("two topics with partition") {
76             val topics = setOf(topicName1, topicName2)
77             val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
78
79             on("started OffsetKafkaConsumer for two iteration of while loop") {
80                 val offsetArgumentCaptor = argumentCaptor<Long>()
81                 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
82
83                 val topicPartition1 = createTopicPartition(topicName1)
84                 val topicPartition2 = createTopicPartition(topicName2)
85                 val topicPartitions = setOf(topicPartition1, topicPartition2)
86
87                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
88
89                 val partitionToOffset1 =
90                         mapOf(topicPartition1 to newOffset,
91                                 topicPartition2 to anotherNewOffset)
92                 val partitionToOffset2 =
93                         mapOf(topicPartition1 to anotherNewOffset,
94                                 topicPartition2 to newOffset)
95                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
96                         .thenReturn(partitionToOffset1, partitionToOffset2)
97
98                 runBlockingTest(testDispatcher) {
99                     val job = kafkaSource.start(updateIntervalInMs)
100                     verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged(
101                             offsetArgumentCaptor.capture(),
102                             topicPartitionArgumentCaptor.capture()
103                     )
104
105                     it("should notify offset changed with proper arguments - before interval"){
106                         assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
107                         assertThat(topicPartitionArgumentCaptor.firstValue.topic())
108                                 .isEqualToIgnoringCase(topicPartition1.topic())
109                         assertThat(topicPartitionArgumentCaptor.firstValue.partition())
110                                 .isEqualTo(topicPartition1.partition())
111
112                         assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
113                         assertThat(topicPartitionArgumentCaptor.secondValue.topic())
114                                 .isEqualToIgnoringCase(topicPartition2.topic())
115                         assertThat(topicPartitionArgumentCaptor.secondValue.partition())
116                                 .isEqualTo(topicPartition2.partition())
117                     }
118                     reset(mockedMetrics)
119                     advanceTimeBy(updateIntervalInMs)
120
121                     job.cancelAndJoin()
122
123                     verify(mockedMetrics, times(topicsAmount)).notifyOffsetChanged(
124                             offsetArgumentCaptor.capture(),
125                             topicPartitionArgumentCaptor.capture()
126                     )
127
128                     it("should notify offset changed with proper arguments - after interval") {
129                         assertThat(offsetArgumentCaptor.thirdValue).isEqualTo(anotherNewOffset)
130                         assertThat(topicPartitionArgumentCaptor.thirdValue.topic())
131                                 .isEqualToIgnoringCase(topicPartition1.topic())
132                         assertThat(topicPartitionArgumentCaptor.thirdValue.partition())
133                                 .isEqualTo(topicPartition1.partition())
134
135                         assertThat(offsetArgumentCaptor.lastValue).isEqualTo(newOffset)
136                         assertThat(topicPartitionArgumentCaptor.lastValue.topic())
137                                 .isEqualToIgnoringCase(topicPartition2.topic())
138                         assertThat(topicPartitionArgumentCaptor.lastValue.partition())
139                                 .isEqualTo(topicPartition2.partition())
140                     }
141                 }
142             }
143         }
144
145         given("empty topicName list") {
146             val emptyTopics = emptySet<String>()
147             val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher)
148
149             on("call of function start") {
150                 val emptyTopicPartitions = setOf(null)
151                 whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
152                 whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
153                         .thenReturn(emptyMap())
154
155                 runBlockingTest(testDispatcher) {
156                     val job = offsetKafkaConsumer.start(updateIntervalInMs)
157                     job.cancelAndJoin()
158                 }
159
160                 it("should not interact with metrics") {
161                     verifyZeroInteractions(mockedMetrics)
162                 }
163             }
164         }
165
166     }
167 })
168
169 private const val updateIntervalInMs = 10L
170 private const val partitionNumber = 0
171 private const val newOffset = 2L
172 private const val anotherNewOffset = 10L
173 private const val topicName1 = "topicName1"
174 private const val topicName2 = "topicName2"
175 private const val topicsAmount = 2
176 fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)