8616ce03e69f9ce4332823a274580584abacda2f
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.google.gson.JsonParser
23 import com.nhaarman.mockitokotlin2.any
24 import com.nhaarman.mockitokotlin2.eq
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.whenever
27 import org.assertj.core.api.Assertions.assertThat
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.jetbrains.spek.api.dsl.on
33 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
35 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
38 import reactor.core.publisher.Flux
39
40 import reactor.core.publisher.Mono
41 import reactor.retry.Retry
42 import reactor.test.StepVerifier
43 import java.time.Duration
44
45 /**
46  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
47  * @since May 2018
48  */
49 internal object ConfigurationProviderImplTest : Spek({
50
51     describe("Configuration provider") {
52
53         val cbsClient: CbsClient = mock()
54         val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
55         val healthStateProvider = HealthState.INSTANCE
56
57         given("configuration is never in cbs") {
58             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
59
60             on("waiting for configuration") {
61                 val waitTime = Duration.ofMillis(100)
62
63                 it("should not get it") {
64                     StepVerifier.create(configProvider().take(1))
65                             .expectNoEvent(waitTime)
66                 }
67             }
68         }
69
70         given("valid configuration from cbs") {
71             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
72
73             on("new configuration") {
74                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
75                         .thenReturn(Flux.just(validConfiguration))
76                 it("should use received configuration") {
77
78                     StepVerifier.create(configProvider().take(1))
79                             .consumeNextWith {
80                                 val route1 = it.elementAt(0)
81                                 val route2 = it.elementAt(1)
82                                 val receivedSink1 = route1.sink
83                                 val receivedSink2 = route2.sink
84
85                                 assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
86                                 assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
87                                 assertThat(receivedSink1.bootstrapServers())
88                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
89                                 assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
90
91                                 assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
92                                 assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
93                                 assertThat(receivedSink2.bootstrapServers())
94                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
95                                 assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
96
97                             }.verifyComplete()
98                 }
99             }
100
101         }
102         given("invalid configuration from cbs") {
103             val iterationCount = 3L
104             val configProvider = constructConfigurationProvider(
105                     cbsClientMock, healthStateProvider, iterationCount
106             )
107
108             on("new configuration") {
109                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
110                         .thenReturn(Flux.just(invalidConfiguration))
111
112                 it("should interrupt the flux") {
113                     StepVerifier.create(configProvider())
114                             .verifyError()
115                 }
116
117                 it("should update the health state") {
118                     StepVerifier.create(healthStateProvider().take(iterationCount))
119                             .expectNextCount(iterationCount - 1)
120                             .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
121                             .verifyComplete()
122                 }
123             }
124         }
125     }
126
127 })
128
129
130 val PERF3GPP_REGIONAL = "perf3gpp_regional"
131 val PERF3GPP_CENTRAL = "perf3gpp_central"
132
133 private val aafCredentials1 = ImmutableAafCredentials.builder()
134         .username("client")
135         .password("very secure password")
136         .build()
137
138 private val aafCredentials2 = ImmutableAafCredentials.builder()
139         .username("other_client")
140         .password("another very secure password")
141         .build()
142
143 private val validConfiguration = JsonParser().parse("""
144 {
145     "streams_publishes": {
146         "$PERF3GPP_REGIONAL": {
147             "type": "kafka",
148             "aaf_credentials": {
149                 "username": "client",
150                 "password": "very secure password"
151             },
152             "kafka_info": {
153                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
154                 "topic_name": "REG_HVVES_PERF3GPP"
155             }
156         },
157         "$PERF3GPP_CENTRAL": {
158             "type": "kafka",
159             "aaf_credentials": {
160                 "username": "other_client",
161                 "password": "another very secure password"
162             },
163             "kafka_info": {
164                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
165                 "topic_name": "CEN_HVVES_PERF3GPP"
166             }
167         }
168     }
169 }""").asJsonObject
170
171 private val invalidConfiguration = JsonParser().parse("""
172 {
173     "streams_publishes": {
174         "$PERF3GPP_REGIONAL": {
175             "type": "kafka",
176             "aaf_credentials": {
177                 "username": "client",
178                 "password": "very secure password"
179             },
180             "kafka_info": {
181                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
182                 "popic_name": "REG_HVVES_PERF3GPP"
183             }
184         }
185     }
186 }""").asJsonObject
187
188 private val firstRequestDelay = Duration.ofMillis(1)
189 private val requestInterval = Duration.ofMillis(1)
190 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
191
192 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
193                                            healthState: HealthState,
194                                            iterationCount: Long = 1
195 ): ConfigurationProviderImpl {
196
197     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
198
199     return ConfigurationProviderImpl(
200             cbsClientMono,
201             firstRequestDelay,
202             requestInterval,
203             healthState,
204             streamParser,
205             retry
206     )
207 }