2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.kafka.api
22 import com.nhaarman.mockitokotlin2.mock
23 import org.assertj.core.api.Assertions.assertThat
24 import org.assertj.core.api.Assertions.entry
25 import org.jetbrains.spek.api.Spek
26 import org.jetbrains.spek.api.dsl.describe
27 import org.jetbrains.spek.api.dsl.given
28 import org.jetbrains.spek.api.dsl.it
29 import org.jetbrains.spek.api.dsl.on
31 object ConsumerFactoryTest : Spek({
32 describe("ConsumerFactory") {
33 val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41"
34 given("consumer provider"){
35 val mockedKafkaConsumer = mock<KafkaConsumer>()
36 val consumerProvider = { mockedKafkaConsumer }
37 on("creation of consumer") {
38 val kafkaTopics = setOf("topic1", "topic2")
39 val consumer = ConsumerFactory.createConsumersForTopics(
40 kafkaBootstrapServers,
43 it("should create consumer"){
44 assertThat(consumer).isNotEmpty.hasSize(2)
45 assertThat(consumer).contains(entry("topic1", mockedKafkaConsumer),
46 entry("topic2", mockedKafkaConsumer))
49 on("empty kafkaTopics set"){
50 val emptyKafkaTopics = emptySet<String>()
51 val consumer = ConsumerFactory.createConsumersForTopics(
52 kafkaBootstrapServers,
56 it("should not create consumer"){
57 assertThat(consumer).isEmpty()