31415454df6fd69cb2bf5c780ac8519869716892
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-configuration / src / test / kotlin / org / onap / dcae / collectors / veshv / config / impl / CbsConfigurationProviderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.config.impl
21
22 import arrow.core.Some
23 import com.google.gson.JsonParser
24 import com.nhaarman.mockitokotlin2.any
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.times
27 import com.nhaarman.mockitokotlin2.verify
28 import com.nhaarman.mockitokotlin2.whenever
29 import org.assertj.core.api.Assertions.assertThat
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.given
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
36 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
38 import reactor.core.publisher.Flux
39 import reactor.core.publisher.Mono
40 import reactor.retry.Retry
41 import reactor.test.StepVerifier
42 import java.time.Duration
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since May 2018
47  */
48 internal object CbsConfigurationProviderTest : Spek({
49
50     describe("Configuration provider") {
51
52         val cbsClientAdapter = mock<CbsClientAdapter>()
53         val configStateListener = mock<ConfigurationStateListener>()
54
55         given("configuration is never in cbs") {
56             val cbsClientMock = mock<CbsClient>()
57             val configProvider = constructConfigurationProvider(
58                     constructCbsClientAdapter(cbsClientMock, configStateListener),
59                     configStateListener
60             )
61
62             on("waiting for configuration") {
63                 val waitTime = Duration.ofMillis(100)
64
65                 it("should not get it") {
66                     StepVerifier.create(configProvider().take(1))
67                             .expectNoEvent(waitTime)
68                 }
69             }
70         }
71
72         given("valid configuration from cbs") {
73             val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
74
75             on("new configuration") {
76                 whenever(cbsClientAdapter.configurationUpdates(any()))
77                         .thenReturn(Flux.just(validConfiguration))
78                 it("should use received configuration") {
79
80                     StepVerifier.create(configProvider().take(1))
81                             .consumeNextWith {
82                                 assertThat(it.requestIntervalSec).isEqualTo(Some(5L))
83                                 assertThat(it.listenPort).isEqualTo(Some(6061))
84                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
85
86                                 val sinks = it.streamPublishers.orNull()!!
87                                 val sink1 = sinks[0]
88                                 val sink2 = sinks[1]
89
90                                 assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
91                                 assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
92                                 assertThat(sink1.bootstrapServers())
93                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
94                                 assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
95
96                                 assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
97                                 assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
98                                 assertThat(sink2.bootstrapServers())
99                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
100                                 assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
101                             }.verifyComplete()
102                 }
103             }
104         }
105
106         given("invalid configuration from cbs") {
107             val iterationCount = 3L
108             val configProvider = constructConfigurationProvider(
109                     cbsClientAdapter, configStateListener, iterationCount
110             )
111
112             on("new configuration") {
113                 whenever(cbsClientAdapter.configurationUpdates(any()))
114                         .thenReturn(Flux.just(invalidConfiguration))
115
116                 it("should interrupt the flux") {
117                     StepVerifier
118                             .create(configProvider())
119                             .verifyError()
120                 }
121
122                 it("should call state listener when retrying") {
123                     verify(configStateListener, times(iterationCount.toInt())).retrying()
124                 }
125             }
126         }
127     }
128
129 })
130
131
132 private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
133 private const val PERF3GPP_CENTRAL = "perf3gpp_central"
134
135 private val aafCredentials1 = ImmutableAafCredentials.builder()
136         .username("client")
137         .password("very secure password")
138         .build()
139
140 private val aafCredentials2 = ImmutableAafCredentials.builder()
141         .username("other_client")
142         .password("another very secure password")
143         .build()
144
145 private val validConfiguration = JsonParser().parse("""
146 {
147     "server.listenPort": 6061,
148     "server.idleTimeoutSec": 60,
149     "cbs.requestIntervalSec": 5,
150     "streams_publishes": {
151         "$PERF3GPP_REGIONAL": {
152             "type": "kafka",
153             "aaf_credentials": {
154                 "username": "client",
155                 "password": "very secure password"
156             },
157             "kafka_info": {
158                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
159                 "topic_name": "REG_HVVES_PERF3GPP"
160             }
161         },
162         "$PERF3GPP_CENTRAL": {
163             "type": "kafka",
164             "aaf_credentials": {
165                 "username": "other_client",
166                 "password": "another very secure password"
167             },
168             "kafka_info": {
169                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
170                 "topic_name": "CEN_HVVES_PERF3GPP"
171             }
172         }
173     }
174 }""").asJsonObject
175
176 private val invalidConfiguration = JsonParser().parse("""
177 {
178     "streams_publishes": {
179         "$PERF3GPP_REGIONAL": {
180             "type": "kafka",
181             "aaf_credentials": {
182                 "user": "client",
183                 "password": "very secure password"
184             },
185             "kafka_info": {
186                 "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
187                 "name": "REG_HVVES_PERF3GPP"
188             }
189         }
190     }
191 }""").asJsonObject
192
193 private val firstRequestDelay = Duration.ofMillis(1)
194 private val configParser = JsonConfigurationParser()
195
196 private fun retry(iterationCount: Long = 1) = Retry
197         .onlyIf<Any> { it.iteration() <= iterationCount }
198         .fixedBackoff(Duration.ofNanos(1))
199
200 private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
201         CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry())
202
203 private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
204                                            configurationStateListener: ConfigurationStateListener,
205                                            iterationCount: Long = 1
206 ): CbsConfigurationProvider =
207         CbsConfigurationProvider(
208                 cbsClientAdapter,
209                 configParser,
210                 configurationStateListener,
211                 { mapOf("k" to "v") },
212                 retry(iterationCount)
213         )