2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018-2019 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.impl.adapters
 
  22 import com.google.gson.JsonParser
 
  23 import com.nhaarman.mockitokotlin2.any
 
  24 import com.nhaarman.mockitokotlin2.eq
 
  25 import com.nhaarman.mockitokotlin2.mock
 
  26 import com.nhaarman.mockitokotlin2.whenever
 
  27 import org.assertj.core.api.Assertions.assertThat
 
  28 import org.jetbrains.spek.api.Spek
 
  29 import org.jetbrains.spek.api.dsl.describe
 
  30 import org.jetbrains.spek.api.dsl.given
 
  31 import org.jetbrains.spek.api.dsl.it
 
  32 import org.jetbrains.spek.api.dsl.on
 
  33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 
  34 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 
  35 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 
  36 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 
  38 import reactor.core.publisher.Flux
 
  40 import reactor.core.publisher.Mono
 
  41 import reactor.retry.Retry
 
  42 import reactor.test.StepVerifier
 
  43 import java.time.Duration
 
  46  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 
  49 internal object ConfigurationProviderImplTest : Spek({
 
  51     describe("Configuration provider") {
 
  53         val cbsClient: CbsClient = mock()
 
  54         val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
 
  55         val healthStateProvider = HealthState.INSTANCE
 
  57         given("configuration is never in cbs") {
 
  58             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
 
  60             on("waiting for configuration") {
 
  61                 val waitTime = Duration.ofMillis(100)
 
  63                 it("should not get it") {
 
  64                     StepVerifier.create(configProvider().take(1))
 
  65                             .expectNoEvent(waitTime)
 
  70         given("valid configuration from cbs") {
 
  71             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
 
  73             on("new configuration") {
 
  74                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
 
  75                         .thenReturn(Flux.just(validConfiguration))
 
  76                 it("should use received configuration") {
 
  78                     StepVerifier.create(configProvider().take(1))
 
  81                                 val route1 = it.routes[0]
 
  82                                 assertThat(FAULT.domainName)
 
  83                                         .describedAs("routed domain 1")
 
  84                                         .isEqualTo(route1.domain)
 
  85                                 assertThat("test-topic-1")
 
  86                                         .describedAs("target topic 1")
 
  87                                         .isEqualTo(route1.targetTopic)
 
  89                                 val route2 = it.routes[1]
 
  90                                 assertThat(HEARTBEAT.domainName)
 
  91                                         .describedAs("routed domain 2")
 
  92                                         .isEqualTo(route2.domain)
 
  93                                 assertThat("test-topic-2")
 
  94                                         .describedAs("target topic 2")
 
  95                                         .isEqualTo(route2.targetTopic)
 
 102         given("invalid configuration from cbs") {
 
 103             val iterationCount = 3L
 
 104             val configProvider = constructConfigurationProvider(
 
 105                     cbsClientMock, healthStateProvider, iterationCount
 
 108             on("new configuration") {
 
 109                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
 
 110                         .thenReturn(Flux.just(invalidConfiguration))
 
 112                 it("should interrupt the flux") {
 
 113                     StepVerifier.create(configProvider())
 
 117                 it("should update the health state") {
 
 118                     StepVerifier.create(healthStateProvider().take(iterationCount))
 
 119                             .expectNextCount(iterationCount - 1)
 
 120                             .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
 
 130 private val validConfiguration = JsonParser().parse("""
 
 132     "whatever": "garbage",
 
 133     "collector.routing": [
 
 135                 "fromDomain": "fault",
 
 136                 "toTopic": "test-topic-1"
 
 139                 "fromDomain": "heartbeat",
 
 140                 "toTopic": "test-topic-2"
 
 145 private val invalidConfiguration = JsonParser().parse("""
 
 147     "whatever": "garbage",
 
 148     "collector.routing": [
 
 150                 "fromDomain": "garbage",
 
 151                 "meaningful": "garbage"
 
 156 private val firstRequestDelay = Duration.ofMillis(1)
 
 157 private val requestInterval = Duration.ofMillis(1)
 
 159 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
 
 160                                            healthState: HealthState,
 
 161                                            iterationCount: Long = 1
 
 162 ): ConfigurationProviderImpl {
 
 164     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
 
 166     return ConfigurationProviderImpl(