571a6680963a1a1353aa341d2e78abfb78f408ca
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.google.gson.JsonParser
23 import com.nhaarman.mockitokotlin2.any
24 import com.nhaarman.mockitokotlin2.eq
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.whenever
27 import org.assertj.core.api.Assertions.assertThat
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.jetbrains.spek.api.dsl.on
33 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
35 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
38 import reactor.core.publisher.Flux
39 import reactor.core.publisher.Mono
40 import reactor.retry.Retry
41 import reactor.test.StepVerifier
42 import java.time.Duration
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since May 2018
47  */
48 internal object ConfigurationProviderImplTest : Spek({
49
50     describe("Configuration provider") {
51
52         val cbsClient: CbsClient = mock()
53         val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
54         val healthStateProvider = HealthState.INSTANCE
55
56         given("configuration is never in cbs") {
57             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
58
59             on("waiting for configuration") {
60                 val waitTime = Duration.ofMillis(100)
61
62                 it("should not get it") {
63                     StepVerifier.create(configProvider().take(1))
64                             .expectNoEvent(waitTime)
65                 }
66             }
67
68         }
69         given("valid configuration from cbs") {
70             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
71
72             on("new configuration") {
73                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
74                         .thenReturn(Flux.just(validConfiguration))
75                 it("should use received configuration") {
76
77                     StepVerifier.create(configProvider().take(1))
78                             .consumeNextWith {
79                                 val receivedSink1 = it.elementAt(0)
80                                 val receivedSink2 = it.elementAt(1)
81
82                                 assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
83                                 assertThat(receivedSink1.bootstrapServers())
84                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
85                                 assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
86
87                                 assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
88                                 assertThat(receivedSink2.bootstrapServers())
89                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
90                                 assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
91                             }.verifyComplete()
92                 }
93             }
94
95         }
96         given("invalid configuration from cbs") {
97             val iterationCount = 3L
98             val configProvider = constructConfigurationProvider(
99                     cbsClientMock, healthStateProvider, iterationCount
100             )
101
102             on("new configuration") {
103                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
104                         .thenReturn(Flux.just(invalidConfiguration))
105
106                 it("should interrupt the flux") {
107                     StepVerifier.create(configProvider())
108                             .verifyError()
109                 }
110
111                 it("should update the health state") {
112                     StepVerifier.create(healthStateProvider().take(iterationCount))
113                             .expectNextCount(iterationCount - 1)
114                             .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
115                             .verifyComplete()
116                 }
117             }
118         }
119     }
120
121 })
122
123 private val aafCredentials1 = ImmutableAafCredentials.builder()
124         .username("client")
125         .password("very secure password")
126         .build()
127
128 private val aafCredentials2 = ImmutableAafCredentials.builder()
129         .username("other_client")
130         .password("another very secure password")
131         .build()
132
133 private val validConfiguration = JsonParser().parse("""
134 {
135     "streams_publishes": {
136         "perf3gpp_regional": {
137             "type": "kafka",
138             "aaf_credentials": {
139                 "username": "client",
140                 "password": "very secure password"
141             },
142             "kafka_info": {
143                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
144                 "topic_name": "REG_HVVES_PERF3GPP"
145             }
146         },
147         "perf3gpp_central": {
148             "type": "kafka",
149             "aaf_credentials": {
150                 "username": "other_client",
151                 "password": "another very secure password"
152             },
153             "kafka_info": {
154                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
155                 "topic_name": "CEN_HVVES_PERF3GPP"
156             }
157         }
158     }
159 }""").asJsonObject
160
161 private val invalidConfiguration = JsonParser().parse("""
162 {
163     "streams_publishes": {
164         "perf3gpp_regional": {
165             "type": "kafka",
166             "aaf_credentials": {
167                 "username": "client",
168                 "password": "very secure password"
169             },
170             "kafka_info": {
171                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
172                 "popic_name": "REG_HVVES_PERF3GPP"
173             }
174         }
175     }
176 }""").asJsonObject
177
178 private val firstRequestDelay = Duration.ofMillis(1)
179 private val requestInterval = Duration.ofMillis(1)
180 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
181
182 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
183                                            healthState: HealthState,
184                                            iterationCount: Long = 1
185 ): ConfigurationProviderImpl {
186
187     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
188
189     return ConfigurationProviderImpl(
190             cbsClientMono,
191             firstRequestDelay,
192             requestInterval,
193             healthState,
194             streamParser,
195             retry
196     )
197 }