b0eb7a525e6a217376d0bedc41ace6ee92cdc6a6
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
21
22 import com.nhaarman.mockitokotlin2.argumentCaptor
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.times
25 import com.nhaarman.mockitokotlin2.verify
26 import com.nhaarman.mockitokotlin2.verifyZeroInteractions
27 import com.nhaarman.mockitokotlin2.whenever
28 import kotlinx.coroutines.ExperimentalCoroutinesApi
29 import kotlinx.coroutines.cancelAndJoin
30 import kotlinx.coroutines.test.TestCoroutineDispatcher
31 import kotlinx.coroutines.test.runBlockingTest
32 import org.apache.kafka.clients.consumer.KafkaConsumer
33 import org.apache.kafka.common.TopicPartition
34 import org.assertj.core.api.Assertions.assertThat
35 import org.jetbrains.spek.api.Spek
36 import org.jetbrains.spek.api.dsl.given
37 import org.jetbrains.spek.api.dsl.it
38 import org.jetbrains.spek.api.dsl.on
39 import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
40
41 @ExperimentalCoroutinesApi
42 object KafkaSourceTest : Spek({
43     given("KafkaSource") {
44         val testDispatcher = TestCoroutineDispatcher()
45         val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
46         afterEachTest {
47             testDispatcher.cleanupTestCoroutines()
48         }
49         given("single topicName and partition") {
50             val topics = setOf("topicName")
51             val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
52             val mockedOffsetConsumer = mock<OffsetConsumer>()
53             on("started KafkaSource") {
54                 val topicPartition = createTopicPartition("topicName")
55                 val topicPartitions = setOf(topicPartition)
56                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
57                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
58                         .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
59
60                 runBlockingTest {
61                     val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
62                     job.cancelAndJoin()
63                 }
64
65                 it("should call update function on topicName") {
66                     verify(mockedOffsetConsumer).update(topicPartition, newOffset)
67                 }
68             }
69         }
70
71         given("two topics with partition") {
72             val topics = setOf(topicName1, topicName2)
73             val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
74             val mockedOffsetConsumer = mock<OffsetConsumer>()
75
76             on("started KafkaSource for two iteration of while loop") {
77                 val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
78                 val offsetArgumentCaptor = argumentCaptor<Long>()
79                 val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
80                 val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
81                 val topicPartition1 = createTopicPartition(topicName1)
82                 val topicPartition2 = createTopicPartition(topicName2)
83                 val topicPartitions = setOf(topicPartition1, topicPartition2)
84                 whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
85                 val partitionToOffset1 =
86                         mapOf(topicPartition1 to newOffset,
87                                 topicPartition2 to anotherNewOffset)
88                 val partitionToOffset2 =
89                         mapOf(topicPartition1 to anotherNewOffset,
90                                 topicPartition2 to newOffset)
91                 whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
92                         .thenReturn(partitionToOffset1, partitionToOffset2)
93
94                 runBlockingTest {
95                     val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
96                     verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
97                             offsetArgumentCaptor.capture())
98
99                     testDispatcher.advanceTimeBy(updateIntervalInMs)
100
101                     verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
102                             .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
103
104                     it("should calls update function with proper arguments - before interval") {
105                         assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
106                         assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
107                         assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
108                         assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
109                     }
110                     it("should calls update function with proper arguments - after interval") {
111                         assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
112                         assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
113                         assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
114                         assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
115                     }
116                     job.cancelAndJoin()
117                 }
118             }
119         }
120
121         given("empty topicName list") {
122             val emptyTopics = emptySet<String>()
123             val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
124             val mockedOffsetConsumer = mock<OffsetConsumer>()
125
126             on("call of function start") {
127                 val emptyTopicPartitions = setOf(null)
128                 whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
129                 whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
130                         .thenReturn(emptyMap())
131
132                 runBlockingTest {
133                     val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
134                     job.cancelAndJoin()
135                 }
136
137                 it("should not interact with OffsetConsumer") {
138                     verifyZeroInteractions(mockedOffsetConsumer)
139                 }
140             }
141         }
142
143     }
144 })
145
146 private const val updateIntervalInMs = 10L
147 private const val partitionNumber = 0
148 private const val newOffset = 2L
149 private const val anotherNewOffset = 10L
150 private const val topicName1 = "topicName1"
151 private const val topicName2 = "topicName2"
152 private const val topicsAmount = 2
153 private const val topicsAmountAfterInterval = 4
154 fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)