Use CBS by means of SDK in place of Consul
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConfigurationProviderTest.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
+import com.google.gson.JsonParser
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
@@ -29,11 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.mockito.Mockito
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Flux
 
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
@@ -44,24 +46,36 @@ import java.time.Duration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal object ConsulConfigurationProviderTest : Spek({
+internal object ConfigurationProviderImplTest : Spek({
 
-    describe("Consul configuration provider") {
+    describe("Configuration provider") {
 
-        val httpAdapterMock: HttpAdapter = mock()
+        val cbsClient: CbsClient = mock()
+        val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
         val healthStateProvider = HealthState.INSTANCE
 
-        given("valid resource url") {
-            val validUrl = "http://valid-url/"
-            val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+        given("configuration is never in cbs") {
+            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
 
-            on("call to consul") {
-                whenever(httpAdapterMock.get(eq(validUrl), any(), Mockito.anyMap()))
-                        .thenReturn(Mono.just(constructConsulResponse()))
+            on("waiting for configuration") {
+                val waitTime = Duration.ofMillis(100)
 
+                it("should not get it") {
+                    StepVerifier.create(configProvider().take(1))
+                            .expectNoEvent(waitTime)
+                }
+            }
+
+        }
+        given("valid configuration from cbs") {
+            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+
+            on("new configuration") {
+                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                        .thenReturn(Flux.just(validConfiguration))
                 it("should use received configuration") {
 
-                    StepVerifier.create(consulConfigProvider().take(1))
+                    StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
 
                                 val route1 = it.routing.routes[0]
@@ -85,22 +99,19 @@ internal object ConsulConfigurationProviderTest : Spek({
             }
 
         }
-        given("invalid resource url") {
-            val invalidUrl = "http://invalid-url/"
-
+        given("invalid configuration from cbs") {
             val iterationCount = 3L
-            val consulConfigProvider = constructConsulConfigProvider(
-                    invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+            val configProvider = constructConfigurationProvider(
+                    cbsClientMock, healthStateProvider, iterationCount
             )
 
-            on("call to consul") {
-                whenever(httpAdapterMock.get(eq(invalidUrl), any(), Mockito.anyMap()))
-                        .thenReturn(Mono.error(RuntimeException("Test exception")))
+            on("new configuration") {
+                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                        .thenReturn(Flux.just(invalidConfiguration))
 
                 it("should interrupt the flux") {
-
-                    StepVerifier.create(consulConfigProvider())
-                            .verifyErrorMessage("Test exception")
+                    StepVerifier.create(configProvider())
+                            .verifyError()
                 }
 
                 it("should update the health state") {
@@ -115,28 +126,9 @@ internal object ConsulConfigurationProviderTest : Spek({
 
 })
 
-private fun constructConsulConfigProvider(url: String,
-                                          httpAdapter: HttpAdapter,
-                                          healthState: HealthState,
-                                          iterationCount: Long = 1
-): ConsulConfigurationProvider {
-
-    val firstRequestDelay = Duration.ofMillis(1)
-    val requestInterval = Duration.ofMillis(1)
-    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
 
-    return ConsulConfigurationProvider(
-            httpAdapter,
-            url,
-            firstRequestDelay,
-            requestInterval,
-            healthState,
-            retry
-    )
-}
-
-fun constructConsulResponse(): String =
-    """{
+private val validConfiguration = JsonParser().parse("""
+{
     "whatever": "garbage",
     "collector.routing": [
             {
@@ -148,4 +140,34 @@ fun constructConsulResponse(): String =
                 "toTopic": "test-topic-2"
             }
     ]
-    }"""
+}""").asJsonObject
+
+private val invalidConfiguration = JsonParser().parse("""
+{
+    "whatever": "garbage",
+    "collector.routing": [
+            {
+                "fromDomain": "garbage",
+                "meaningful": "garbage"
+            }
+    ]
+}""").asJsonObject
+
+private val firstRequestDelay = Duration.ofMillis(1)
+private val requestInterval = Duration.ofMillis(1)
+
+private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+                                           healthState: HealthState,
+                                           iterationCount: Long = 1
+): ConfigurationProviderImpl {
+
+    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+    return ConfigurationProviderImpl(
+            cbsClientMono,
+            firstRequestDelay,
+            requestInterval,
+            healthState,
+            retry
+    )
+}