Extract HV-VES configuration module
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSinkProviderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
21
22 import arrow.syntax.collections.tail
23 import com.nhaarman.mockitokotlin2.mock
24 import com.nhaarman.mockitokotlin2.verify
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.jetbrains.spek.api.dsl.on
31 import org.onap.dcae.collectors.veshv.config.api.model.KafkaConfiguration
32 import org.onap.dcae.collectors.veshv.model.ClientContext
33 import org.onap.dcae.collectors.veshv.domain.VesMessage
34 import org.onap.ves.VesEventOuterClass
35 import reactor.kafka.sender.KafkaSender
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since December 2018
40  */
41 internal object KafkaSinkProviderTest : Spek({
42     describe("non functional requirements") {
43         given("sample configuration") {
44             val config = KafkaConfiguration("localhost:9090",
45                     1024 * 1024)
46             val cut = KafkaSinkProvider(config)
47
48             on("sample clients") {
49                 val clients = listOf(
50                         ClientContext(),
51                         ClientContext(),
52                         ClientContext(),
53                         ClientContext())
54
55                 it("should create only one instance of KafkaSender") {
56                     val sinks = clients.map(cut::invoke)
57                     val firstSink = sinks[0]
58                     val restOfSinks = sinks.tail()
59
60                     assertThat(restOfSinks).isNotEmpty
61                     assertThat(restOfSinks).allSatisfy { sink ->
62                         assertThat(firstSink.usesSameSenderAs(sink))
63                                 .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
64                                 .isTrue()
65                     }
66                 }
67             }
68         }
69
70         given("dummy KafkaSender") {
71             val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
72             val cut = KafkaSinkProvider(kafkaSender)
73
74             on("close") {
75                 cut.close().unsafeRunSync()
76
77                 it("should close KafkaSender") {
78                     verify(kafkaSender).close()
79                 }
80             }
81         }
82     }
83 })