94eb519d16c5fd7a2363afe7c89f22bdad260d06
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.config.impl
21
22 import arrow.core.Some
23 import com.google.gson.JsonParser
24 import com.nhaarman.mockitokotlin2.any
25 import com.nhaarman.mockitokotlin2.eq
26 import com.nhaarman.mockitokotlin2.mock
27 import com.nhaarman.mockitokotlin2.times
28 import com.nhaarman.mockitokotlin2.verify
29 import com.nhaarman.mockitokotlin2.whenever
30 import org.assertj.core.api.Assertions.assertThat
31 import org.jetbrains.spek.api.Spek
32 import org.jetbrains.spek.api.dsl.describe
33 import org.jetbrains.spek.api.dsl.given
34 import org.jetbrains.spek.api.dsl.it
35 import org.jetbrains.spek.api.dsl.on
36 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
37 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
38 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
41 import reactor.core.publisher.Flux
42 import reactor.core.publisher.Mono
43 import reactor.retry.Retry
44 import reactor.test.StepVerifier
45 import java.time.Duration
46
47 /**
48  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
49  * @since May 2018
50  */
51 internal object CbsConfigurationProviderTest : Spek({
52
53     describe("Configuration provider") {
54
55         val cbsClient = mock<CbsClient>()
56         val cbsClientMock = Mono.just(cbsClient)
57         val configStateListener = mock<ConfigurationStateListener>()
58
59         given("configuration is never in cbs") {
60             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
61
62             on("waiting for configuration") {
63                 val waitTime = Duration.ofMillis(100)
64
65                 it("should not get it") {
66                     StepVerifier.create(configProvider().take(1))
67                             .expectNoEvent(waitTime)
68                 }
69             }
70         }
71
72         given("valid configuration from cbs") {
73             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
74
75             on("new configuration") {
76                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
77                         .thenReturn(Flux.just(validConfiguration))
78                 it("should use received configuration") {
79
80                     StepVerifier.create(configProvider().take(1))
81                             .consumeNextWith {
82
83                                 assertThat(it.listenPort).isEqualTo(Some(6061))
84                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
85                                 assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
86
87
88                                 val sinks = it.streamPublishers.orNull()!!
89                                 val sink1 = sinks[0]
90                                 val sink2 = sinks[1]
91
92                                 assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
93                                 assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
94                                 assertThat(sink1.bootstrapServers())
95                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
96                                 assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
97
98                                 assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
99                                 assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
100                                 assertThat(sink2.bootstrapServers())
101                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
102                                 assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
103                             }.verifyComplete()
104                 }
105             }
106         }
107
108         given("invalid configuration from cbs") {
109             val iterationCount = 3L
110             val configProvider = constructConfigurationProvider(
111                     cbsClientMock, configStateListener, iterationCount
112             )
113
114             on("new configuration") {
115                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
116                         .thenReturn(Flux.just(invalidConfiguration))
117
118                 it("should interrupt the flux") {
119                     StepVerifier
120                             .create(configProvider())
121                             .verifyError()
122                 }
123
124                 it("should call state listener when retrying") {
125                     verify(configStateListener, times(iterationCount.toInt())).retrying()
126                 }
127             }
128         }
129     }
130
131 })
132
133
134 private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
135 private const val PERF3GPP_CENTRAL = "perf3gpp_central"
136
137 private val aafCredentials1 = ImmutableAafCredentials.builder()
138         .username("client")
139         .password("very secure password")
140         .build()
141
142 private val aafCredentials2 = ImmutableAafCredentials.builder()
143         .username("other_client")
144         .password("another very secure password")
145         .build()
146
147 private val validConfiguration = JsonParser().parse("""
148 {
149     "server.listenPort": 6061,
150     "server.idleTimeoutSec": 60,
151     "server.maxPayloadSizeBytes": 1048576,
152     "streams_publishes": {
153         "$PERF3GPP_REGIONAL": {
154             "type": "kafka",
155             "aaf_credentials": {
156                 "username": "client",
157                 "password": "very secure password"
158             },
159             "kafka_info": {
160                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
161                 "topic_name": "REG_HVVES_PERF3GPP"
162             }
163         },
164         "$PERF3GPP_CENTRAL": {
165             "type": "kafka",
166             "aaf_credentials": {
167                 "username": "other_client",
168                 "password": "another very secure password"
169             },
170             "kafka_info": {
171                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
172                 "topic_name": "CEN_HVVES_PERF3GPP"
173             }
174         }
175     }
176 }""").asJsonObject
177
178 private val invalidConfiguration = JsonParser().parse("""
179 {
180     "streams_publishes": {
181         "$PERF3GPP_REGIONAL": {
182             "type": "kafka",
183             "aaf_credentials": {
184                 "user": "client",
185                 "password": "very secure password"
186             },
187             "kafka_info": {
188                 "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
189                 "name": "REG_HVVES_PERF3GPP"
190             }
191         }
192     }
193 }""").asJsonObject
194
195 private val firstRequestDelay = Duration.ofMillis(1)
196 private val requestInterval = Duration.ofMillis(1)
197 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
198 private val configParser = JsonConfigurationParser()
199
200 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
201                                            configurationStateListener: ConfigurationStateListener,
202                                            iterationCount: Long = 1
203 ): CbsConfigurationProvider {
204
205     val retry = Retry
206             .onlyIf<Any> { it.iteration() <= iterationCount }
207             .fixedBackoff(Duration.ofNanos(1))
208
209     return CbsConfigurationProvider(
210             cbsClientMono,
211             CbsConfiguration(firstRequestDelay, requestInterval),
212             configParser,
213             streamParser,
214             configurationStateListener,
215             { mapOf("k" to "v") },
216             retry
217     )
218 }