d5fe588edf5c322947d4f511c5b1a250374c30e8
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.config.impl
21
22 import com.google.gson.JsonParser
23 import com.nhaarman.mockitokotlin2.any
24 import com.nhaarman.mockitokotlin2.eq
25 import com.nhaarman.mockitokotlin2.mock
26 import com.nhaarman.mockitokotlin2.times
27 import com.nhaarman.mockitokotlin2.verify
28 import com.nhaarman.mockitokotlin2.whenever
29 import org.assertj.core.api.Assertions.assertThat
30 import org.jetbrains.spek.api.Spek
31 import org.jetbrains.spek.api.dsl.describe
32 import org.jetbrains.spek.api.dsl.given
33 import org.jetbrains.spek.api.dsl.it
34 import org.jetbrains.spek.api.dsl.on
35 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
36 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
37 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
40 import reactor.core.publisher.Flux
41 import reactor.core.publisher.Mono
42 import reactor.retry.Retry
43 import reactor.test.StepVerifier
44 import java.time.Duration
45
46 /**
47  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
48  * @since May 2018
49  */
50 internal object CbsConfigurationProviderTest : Spek({
51
52     describe("Configuration provider") {
53
54         val cbsClient: CbsClient = mock()
55         val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
56         val configStateListener: ConfigurationStateListener = mock()
57
58         given("configuration is never in cbs") {
59             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
60
61             on("waiting for configuration") {
62                 val waitTime = Duration.ofMillis(100)
63
64                 it("should not get it") {
65                     StepVerifier.create(configProvider().take(1))
66                             .expectNoEvent(waitTime)
67                 }
68             }
69         }
70
71         given("valid configuration from cbs") {
72             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
73
74             on("new configuration") {
75                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
76                         .thenReturn(Flux.just(validConfiguration))
77                 it("should use received configuration") {
78
79                     StepVerifier.create(configProvider().take(1))
80                             .consumeNextWith {
81                                 val routes = it.routing.orNull()!!
82                                 val route1 = routes.elementAt(0)
83                                 val route2 = routes.elementAt(1)
84                                 val receivedSink1 = route1.sink
85                                 val receivedSink2 = route2.sink
86
87                                 assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
88                                 assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
89                                 assertThat(receivedSink1.bootstrapServers())
90                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
91                                 assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
92
93                                 assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
94                                 assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
95                                 assertThat(receivedSink2.bootstrapServers())
96                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
97                                 assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
98
99                             }.verifyComplete()
100                 }
101             }
102
103         }
104         given("invalid configuration from cbs") {
105             val iterationCount = 3L
106             val configProvider = constructConfigurationProvider(
107                     cbsClientMock, configStateListener, iterationCount
108             )
109
110             on("new configuration") {
111                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
112                         .thenReturn(Flux.just(invalidConfiguration))
113
114                 it("should interrupt the flux") {
115                     StepVerifier.create(configProvider())
116                             .verifyError()
117                 }
118
119                 it("should call state listener when retrying") {
120                     verify(configStateListener, times(iterationCount.toInt())).retrying()
121                 }
122             }
123         }
124     }
125
126 })
127
128
129 val PERF3GPP_REGIONAL = "perf3gpp_regional"
130 val PERF3GPP_CENTRAL = "perf3gpp_central"
131
132 private val aafCredentials1 = ImmutableAafCredentials.builder()
133         .username("client")
134         .password("very secure password")
135         .build()
136
137 private val aafCredentials2 = ImmutableAafCredentials.builder()
138         .username("other_client")
139         .password("another very secure password")
140         .build()
141
142 private val validConfiguration = JsonParser().parse("""
143 {
144     "streams_publishes": {
145         "$PERF3GPP_REGIONAL": {
146             "type": "kafka",
147             "aaf_credentials": {
148                 "username": "client",
149                 "password": "very secure password"
150             },
151             "kafka_info": {
152                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
153                 "topic_name": "REG_HVVES_PERF3GPP"
154             }
155         },
156         "$PERF3GPP_CENTRAL": {
157             "type": "kafka",
158             "aaf_credentials": {
159                 "username": "other_client",
160                 "password": "another very secure password"
161             },
162             "kafka_info": {
163                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
164                 "topic_name": "CEN_HVVES_PERF3GPP"
165             }
166         }
167     }
168 }""").asJsonObject
169
170 private val invalidConfiguration = JsonParser().parse("""
171 {
172     "streams_publishes": {
173         "$PERF3GPP_REGIONAL": {
174             "type": "kafka",
175             "aaf_credentials": {
176                 "username": "client",
177                 "password": "very secure password"
178             },
179             "kafka_info": {
180                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
181                 "popic_name": "REG_HVVES_PERF3GPP"
182             }
183         }
184     }
185 }""").asJsonObject
186
187 private val firstRequestDelay = Duration.ofMillis(1)
188 private val requestInterval = Duration.ofMillis(1)
189 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
190
191 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
192                                            configurationStateListener: ConfigurationStateListener,
193                                            iterationCount: Long = 1
194 ): CbsConfigurationProvider {
195
196     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
197
198     return CbsConfigurationProvider(
199             cbsClientMono,
200             CbsConfiguration(firstRequestDelay, requestInterval),
201             streamParser,
202             configurationStateListener,
203             retry,
204             { mapOf("k" to "v") }
205     )
206 }