2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.config.impl
22 import com.google.gson.JsonParser
23 import com.nhaarman.mockitokotlin2.any
24 import com.nhaarman.mockitokotlin2.eq
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.times
27 import com.nhaarman.mockitokotlin2.verify
28 import com.nhaarman.mockitokotlin2.whenever
29 import org.assertj.core.api.Assertions.assertThat
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.given
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
36 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
37 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
40 import reactor.core.publisher.Flux
41 import reactor.core.publisher.Mono
42 import reactor.retry.Retry
43 import reactor.test.StepVerifier
44 import java.time.Duration
47 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
50 internal object CbsConfigurationProviderTest : Spek({
52 describe("Configuration provider") {
54 val cbsClient: CbsClient = mock()
55 val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
56 val configStateListener: ConfigurationStateListener = mock()
58 given("configuration is never in cbs") {
59 val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
61 on("waiting for configuration") {
62 val waitTime = Duration.ofMillis(100)
64 it("should not get it") {
65 StepVerifier.create(configProvider().take(1))
66 .expectNoEvent(waitTime)
71 given("valid configuration from cbs") {
72 val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
74 on("new configuration") {
75 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
76 .thenReturn(Flux.just(validConfiguration))
77 it("should use received configuration") {
79 StepVerifier.create(configProvider().take(1))
81 val routes = it.routing.orNull()!!
82 val route1 = routes.elementAt(0)
83 val route2 = routes.elementAt(1)
84 val receivedSink1 = route1.sink
85 val receivedSink2 = route2.sink
87 assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
88 assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
89 assertThat(receivedSink1.bootstrapServers())
90 .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
91 assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
93 assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
94 assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
95 assertThat(receivedSink2.bootstrapServers())
96 .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
97 assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
104 given("invalid configuration from cbs") {
105 val iterationCount = 3L
106 val configProvider = constructConfigurationProvider(
107 cbsClientMock, configStateListener, iterationCount
110 on("new configuration") {
111 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
112 .thenReturn(Flux.just(invalidConfiguration))
114 it("should interrupt the flux") {
115 StepVerifier.create(configProvider())
119 it("should call state listener when retrying") {
120 verify(configStateListener, times(iterationCount.toInt())).retrying()
129 val PERF3GPP_REGIONAL = "perf3gpp_regional"
130 val PERF3GPP_CENTRAL = "perf3gpp_central"
132 private val aafCredentials1 = ImmutableAafCredentials.builder()
134 .password("very secure password")
137 private val aafCredentials2 = ImmutableAafCredentials.builder()
138 .username("other_client")
139 .password("another very secure password")
142 private val validConfiguration = JsonParser().parse("""
144 "streams_publishes": {
145 "$PERF3GPP_REGIONAL": {
148 "username": "client",
149 "password": "very secure password"
152 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
153 "topic_name": "REG_HVVES_PERF3GPP"
156 "$PERF3GPP_CENTRAL": {
159 "username": "other_client",
160 "password": "another very secure password"
163 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
164 "topic_name": "CEN_HVVES_PERF3GPP"
170 private val invalidConfiguration = JsonParser().parse("""
172 "streams_publishes": {
173 "$PERF3GPP_REGIONAL": {
176 "username": "client",
177 "password": "very secure password"
180 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
181 "popic_name": "REG_HVVES_PERF3GPP"
187 private val firstRequestDelay = Duration.ofMillis(1)
188 private val requestInterval = Duration.ofMillis(1)
189 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
191 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
192 configurationStateListener: ConfigurationStateListener,
193 iterationCount: Long = 1
194 ): CbsConfigurationProvider {
196 val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
198 return CbsConfigurationProvider(
200 CbsConfiguration(firstRequestDelay, requestInterval),
202 configurationStateListener,
204 { mapOf("k" to "v") }