2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018-2019 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.config.impl
 
  22 import arrow.core.Some
 
  23 import com.google.gson.JsonParser
 
  24 import com.nhaarman.mockitokotlin2.any
 
  25 import com.nhaarman.mockitokotlin2.eq
 
  26 import com.nhaarman.mockitokotlin2.mock
 
  27 import com.nhaarman.mockitokotlin2.times
 
  28 import com.nhaarman.mockitokotlin2.verify
 
  29 import com.nhaarman.mockitokotlin2.whenever
 
  30 import org.assertj.core.api.Assertions.assertThat
 
  31 import org.jetbrains.spek.api.Spek
 
  32 import org.jetbrains.spek.api.dsl.describe
 
  33 import org.jetbrains.spek.api.dsl.given
 
  34 import org.jetbrains.spek.api.dsl.it
 
  35 import org.jetbrains.spek.api.dsl.on
 
  36 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 
  37 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 
  38 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
 
  41 import reactor.core.publisher.Flux
 
  42 import reactor.core.publisher.Mono
 
  43 import reactor.retry.Retry
 
  44 import reactor.test.StepVerifier
 
  45 import java.time.Duration
 
  48  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 
  51 internal object CbsConfigurationProviderTest : Spek({
 
  53     describe("Configuration provider") {
 
  55         val cbsClient = mock<CbsClient>()
 
  56         val cbsClientMock = Mono.just(cbsClient)
 
  57         val configStateListener = mock<ConfigurationStateListener>()
 
  59         given("configuration is never in cbs") {
 
  60             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
 
  62             on("waiting for configuration") {
 
  63                 val waitTime = Duration.ofMillis(100)
 
  65                 it("should not get it") {
 
  66                     StepVerifier.create(configProvider().take(1))
 
  67                             .expectNoEvent(waitTime)
 
  72         given("valid configuration from cbs") {
 
  73             val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
 
  75             on("new configuration") {
 
  76                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
 
  77                         .thenReturn(Flux.just(validConfiguration))
 
  78                 it("should use received configuration") {
 
  80                     StepVerifier.create(configProvider().take(1))
 
  83                                 assertThat(it.listenPort).isEqualTo(Some(6061))
 
  84                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
 
  85                                 assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
 
  88                                 val sinks = it.streamPublishers.orNull()!!
 
  92                                 assertThat(sink1.name()).isEqualTo(PERF3GPP_REGIONAL)
 
  93                                 assertThat(sink1.aafCredentials()).isEqualTo(aafCredentials1)
 
  94                                 assertThat(sink1.bootstrapServers())
 
  95                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
 
  96                                 assertThat(sink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
 
  98                                 assertThat(sink2.name()).isEqualTo(PERF3GPP_CENTRAL)
 
  99                                 assertThat(sink2.aafCredentials()).isEqualTo(aafCredentials2)
 
 100                                 assertThat(sink2.bootstrapServers())
 
 101                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
 
 102                                 assertThat(sink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
 
 108         given("invalid configuration from cbs") {
 
 109             val iterationCount = 3L
 
 110             val configProvider = constructConfigurationProvider(
 
 111                     cbsClientMock, configStateListener, iterationCount
 
 114             on("new configuration") {
 
 115                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
 
 116                         .thenReturn(Flux.just(invalidConfiguration))
 
 118                 it("should interrupt the flux") {
 
 120                             .create(configProvider())
 
 124                 it("should call state listener when retrying") {
 
 125                     verify(configStateListener, times(iterationCount.toInt())).retrying()
 
 134 private const val PERF3GPP_REGIONAL = "perf3gpp_regional"
 
 135 private const val PERF3GPP_CENTRAL = "perf3gpp_central"
 
 137 private val aafCredentials1 = ImmutableAafCredentials.builder()
 
 139         .password("very secure password")
 
 142 private val aafCredentials2 = ImmutableAafCredentials.builder()
 
 143         .username("other_client")
 
 144         .password("another very secure password")
 
 147 private val validConfiguration = JsonParser().parse("""
 
 149     "server.listenPort": 6061,
 
 150     "server.idleTimeoutSec": 60,
 
 151     "server.maxPayloadSizeBytes": 1048576,
 
 152     "streams_publishes": {
 
 153         "$PERF3GPP_REGIONAL": {
 
 156                 "username": "client",
 
 157                 "password": "very secure password"
 
 160                 "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
 
 161                 "topic_name": "REG_HVVES_PERF3GPP"
 
 164         "$PERF3GPP_CENTRAL": {
 
 167                 "username": "other_client",
 
 168                 "password": "another very secure password"
 
 171                 "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
 
 172                 "topic_name": "CEN_HVVES_PERF3GPP"
 
 178 private val invalidConfiguration = JsonParser().parse("""
 
 180     "streams_publishes": {
 
 181         "$PERF3GPP_REGIONAL": {
 
 185                 "password": "very secure password"
 
 188                 "servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
 
 189                 "name": "REG_HVVES_PERF3GPP"
 
 195 private val firstRequestDelay = Duration.ofMillis(1)
 
 196 private val requestInterval = Duration.ofMillis(1)
 
 197 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
 
 198 private val configParser = JsonConfigurationParser()
 
 200 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
 
 201                                            configurationStateListener: ConfigurationStateListener,
 
 202                                            iterationCount: Long = 1
 
 203 ): CbsConfigurationProvider {
 
 206             .onlyIf<Any> { it.iteration() <= iterationCount }
 
 207             .fixedBackoff(Duration.ofNanos(1))
 
 209     return CbsConfigurationProvider(
 
 211             CbsConfiguration(firstRequestDelay, requestInterval),
 
 214             configurationStateListener,
 
 215             { mapOf("k" to "v") },