2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import com.google.gson.JsonParser
23 import com.nhaarman.mockitokotlin2.any
24 import com.nhaarman.mockitokotlin2.eq
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.whenever
27 import org.assertj.core.api.Assertions.assertThat
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.jetbrains.spek.api.dsl.on
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
34 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
36 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
38 import reactor.core.publisher.Flux
40 import reactor.core.publisher.Mono
41 import reactor.retry.Retry
42 import reactor.test.StepVerifier
43 import java.time.Duration
46 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
49 internal object ConfigurationProviderImplTest : Spek({
51 describe("Configuration provider") {
53 val cbsClient: CbsClient = mock()
54 val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
55 val healthStateProvider = HealthState.INSTANCE
57 given("configuration is never in cbs") {
58 val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
60 on("waiting for configuration") {
61 val waitTime = Duration.ofMillis(100)
63 it("should not get it") {
64 StepVerifier.create(configProvider().take(1))
65 .expectNoEvent(waitTime)
70 given("valid configuration from cbs") {
71 val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
73 on("new configuration") {
74 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
75 .thenReturn(Flux.just(validConfiguration))
76 it("should use received configuration") {
78 StepVerifier.create(configProvider().take(1))
81 val route1 = it.routing.routes[0]
82 assertThat(FAULT.domainName)
83 .describedAs("routed domain 1")
84 .isEqualTo(route1.domain)
85 assertThat("test-topic-1")
86 .describedAs("target topic 1")
87 .isEqualTo(route1.targetTopic)
89 val route2 = it.routing.routes[1]
90 assertThat(HEARTBEAT.domainName)
91 .describedAs("routed domain 2")
92 .isEqualTo(route2.domain)
93 assertThat("test-topic-2")
94 .describedAs("target topic 2")
95 .isEqualTo(route2.targetTopic)
102 given("invalid configuration from cbs") {
103 val iterationCount = 3L
104 val configProvider = constructConfigurationProvider(
105 cbsClientMock, healthStateProvider, iterationCount
108 on("new configuration") {
109 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
110 .thenReturn(Flux.just(invalidConfiguration))
112 it("should interrupt the flux") {
113 StepVerifier.create(configProvider())
117 it("should update the health state") {
118 StepVerifier.create(healthStateProvider().take(iterationCount))
119 .expectNextCount(iterationCount - 1)
120 .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
130 private val validConfiguration = JsonParser().parse("""
132 "whatever": "garbage",
133 "collector.routing": [
135 "fromDomain": "fault",
136 "toTopic": "test-topic-1"
139 "fromDomain": "heartbeat",
140 "toTopic": "test-topic-2"
145 private val invalidConfiguration = JsonParser().parse("""
147 "whatever": "garbage",
148 "collector.routing": [
150 "fromDomain": "garbage",
151 "meaningful": "garbage"
156 private val firstRequestDelay = Duration.ofMillis(1)
157 private val requestInterval = Duration.ofMillis(1)
159 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
160 healthState: HealthState,
161 iterationCount: Long = 1
162 ): ConfigurationProviderImpl {
164 val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
166 return ConfigurationProviderImpl(