0954b76e38b6e564975a41f6a456a7de654d17ae
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-configuration / src / test / kotlin / org / onap / dcae / collectors / veshv / config / impl / CbsConfigurationProviderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.config.impl
21
22 import arrow.core.Some
23 import com.google.gson.JsonParser
24 import com.nhaarman.mockitokotlin2.mock
25 import com.nhaarman.mockitokotlin2.times
26 import com.nhaarman.mockitokotlin2.verify
27 import com.nhaarman.mockitokotlin2.whenever
28 import org.assertj.core.api.Assertions.assertThat
29 import org.jetbrains.spek.api.Spek
30 import org.jetbrains.spek.api.dsl.describe
31 import org.jetbrains.spek.api.dsl.given
32 import org.jetbrains.spek.api.dsl.it
33 import org.jetbrains.spek.api.dsl.on
34 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
35 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
37 import reactor.core.publisher.Flux
38 import reactor.core.publisher.Mono
39 import reactor.retry.Retry
40 import reactor.test.StepVerifier
41 import java.time.Duration
42
43 /**
44  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
45  * @since May 2018
46  */
47 internal object CbsConfigurationProviderTest : Spek({
48
49     describe("Configuration provider") {
50
51         val cbsClientAdapter: CbsClientAdapter = mock()
52         val configStateListener: ConfigurationStateListener = mock()
53
54         given("configuration is never in cbs") {
55             val cbsClientMock: CbsClient = mock()
56             val configProvider = constructConfigurationProvider(
57                     constructCbsClientAdapter(cbsClientMock, configStateListener),
58                     configStateListener
59             )
60
61             on("waiting for configuration") {
62                 val waitTime = Duration.ofMillis(100)
63
64                 it("should not get it") {
65                     StepVerifier.create(configProvider().take(1))
66                             .expectNoEvent(waitTime)
67                 }
68             }
69         }
70
71         given("valid configuration from cbs") {
72             val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
73
74             on("new configuration") {
75                 whenever(cbsClientAdapter.configurationUpdates())
76                         .thenReturn(Flux.just(validConfiguration))
77                 it("should use received configuration") {
78
79                     StepVerifier.create(configProvider().take(1))
80                             .consumeNextWith {
81                                 assertThat(it.requestIntervalSec).isEqualTo(Some(5L))
82                                 assertThat(it.listenPort).isEqualTo(Some(6061))
83                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
84
85                                 val sinks = it.streamPublishers.orNull()!!
86                                 val sink1 = sinks[0]
87                                 val sink2 = sinks[1]
88
89                                 assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
90                                 assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
91                                 assertThat(sink1.bootstrapServers())
92                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
93                                 assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
94
95                                 assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
96                                 assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
97                                 assertThat(sink2.bootstrapServers())
98                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
99                                 assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
100                             }.verifyComplete()
101                 }
102             }
103         }
104
105         given("invalid configuration from cbs") {
106             val iterationCount = 3L
107             val configProvider = constructConfigurationProvider(
108                     cbsClientAdapter, configStateListener, iterationCount
109             )
110
111             on("new configuration") {
112                 whenever(cbsClientAdapter.configurationUpdates())
113                         .thenReturn(Flux.just(invalidConfiguration))
114
115                 it("should interrupt the flux") {
116                     StepVerifier
117                             .create(configProvider())
118                             .verifyError()
119                 }
120
121                 it("should call state listener when retrying") {
122                     verify(configStateListener, times(iterationCount.toInt())).retrying()
123                 }
124             }
125         }
126     }
127
128 })
129
130
131 private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
132 private const val PERF3GPP_CENTRAL = "perf3gpp_central"
133
134 private val aafCredentials1 = ImmutableAafCredentials.builder()
135         .username("client")
136         .password("very secure password")
137         .build()
138
139 private val aafCredentials2 = ImmutableAafCredentials.builder()
140         .username("other_client")
141         .password("another very secure password")
142         .build()
143
144 private val validConfiguration = JsonParser().parse("""
145 {
146     "server.listenPort": 6061,
147     "server.idleTimeoutSec": 60,
148     "cbs.requestIntervalSec": 5,
149     "streams_publishes": {
150         "$PERF3GPP_REGIONAL": {
151             "type": "kafka",
152             "aaf_credentials": {
153                 "username": "client",
154                 "password": "very secure password"
155             },
156             "kafka_info": {
157                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
158                 "topic_name": "REG_HVVES_PERF3GPP"
159             }
160         },
161         "$PERF3GPP_CENTRAL": {
162             "type": "kafka",
163             "aaf_credentials": {
164                 "username": "other_client",
165                 "password": "another very secure password"
166             },
167             "kafka_info": {
168                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
169                 "topic_name": "CEN_HVVES_PERF3GPP"
170             }
171         }
172     }
173 }""").asJsonObject
174
175 private val invalidConfiguration = JsonParser().parse("""
176 {
177     "streams_publishes": {
178         "$PERF3GPP_REGIONAL": {
179             "type": "kafka",
180             "aaf_credentials": {
181                 "user": "client",
182                 "password": "very secure password"
183             },
184             "kafka_info": {
185                 "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
186                 "name": "REG_HVVES_PERF3GPP"
187             }
188         }
189     }
190 }""").asJsonObject
191
192 private val firstRequestDelay = Duration.ofMillis(1)
193 private val configParser = JsonConfigurationParser()
194
195 private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
196         CbsClientAdapter(Mono.just(cbsClientMock), firstRequestDelay, configStateListener, mdc, retry())
197
198 private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
199                                            configurationStateListener: ConfigurationStateListener,
200                                            iterationCount: Long = 1) = CbsConfigurationProvider(
201         cbsClientAdapter,
202         configParser,
203         configurationStateListener,
204         mdc,
205         retry(iterationCount)
206 )