Retry infinitely 38/89738/2
authorFilip Krzywka <filip.krzywka@nokia.com>
Mon, 10 Jun 2019 07:02:43 +0000 (09:02 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Wed, 12 Jun 2019 11:59:55 +0000 (13:59 +0200)
- changed specification to retry infinitely
- moved MDC to cbsAdapter constructor as in whole module it contains
only local context (instanceID etc.). Also permuted constructor params
to match ConfigurationProviders order
- refactored module tests as ground for future enhancements

Change-Id: Ic074b9c421b60662e5512c55c7b1dfb90ab0d2ea
Issue-ID: DCAEGEN2-1557
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt

index e243afe..35adfe7 100644 (file)
@@ -55,7 +55,6 @@ class ConfigurationModule internal constructor(private val configStateListener:
             CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
     )
 
-
     fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
 
     fun hvVesConfigurationUpdates(args: Array<String>,
@@ -67,7 +66,7 @@ class ConfigurationModule internal constructor(private val configStateListener:
                     .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
                     .cache()
                     .flatMapMany { basePartialConfig ->
-                        cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+                        cbsClientAdapter(basePartialConfig, mdc).let { cbsClientAdapter ->
                             cbsConfigurationProvider(cbsClientAdapter, mdc)
                                     .invoke()
                                     .map { configMerger.merge(basePartialConfig, it) }
@@ -75,27 +74,28 @@ class ConfigurationModule internal constructor(private val configStateListener:
                                     .throwOnLeft()
                                     .map(configTransformer::toFinalConfiguration)
                                     .doOnNext {
-                                        cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+                                        cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval)
                                     }
                         }
                     }
 
-    private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
-            CbsClientAdapter(
-                    cbsClient,
-                    configStateListener,
-                    cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
-                    retrySpec
-            )
+    private fun cbsClientAdapter(basePartialConfig: PartialConfiguration,
+                                 mdc: MappedDiagnosticContext) = CbsClientAdapter(
+            cbsClient,
+            cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+            configStateListener,
+            mdc,
+            infiniteRetry
+    )
 
     private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
-                                         mdc: MappedDiagnosticContext) =
-            CbsConfigurationProvider(
-                    cbsClientAdapter,
-                    configParser,
-                    configStateListener,
-                    mdc,
-                    retrySpec)
+                                         mdc: MappedDiagnosticContext) = CbsConfigurationProvider(
+            cbsClientAdapter,
+            configParser,
+            configStateListener,
+            mdc,
+            infiniteRetry
+    )
 
     private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
             configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -104,11 +104,11 @@ class ConfigurationModule internal constructor(private val configStateListener:
     companion object {
         private val logger = Logger(ConfigurationModule::class)
 
-        private const val MAX_RETRIES = 5L
-        private const val INITIAL_BACKOFF = 10L
-        private val retrySpec: Retry<Any> = Retry.any<Any>()
-                .retryMax(MAX_RETRIES)
-                .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+        private val FIRST_BACKOFF_DURATION = Duration.ofSeconds(5)
+        private val MAX_BACKOFF_DURATION = Duration.ofMinutes(5)
+        private val infiniteRetry: Retry<Any> = Retry.any<Any>()
+                .retryMax(Long.MAX_VALUE)
+                .exponentialBackoff(FIRST_BACKOFF_DURATION, MAX_BACKOFF_DURATION)
                 .jitter(Jitter.random())
     }
 
index d31f658..8b7ed67 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcae.collectors.veshv.utils.rx.delayElements
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
@@ -35,26 +34,31 @@ import java.util.concurrent.atomic.AtomicReference
 
 
 internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
-                                private val configurationStateListener: ConfigurationStateListener,
                                 private val firstRequestDelay: Duration,
-                                private val retrySpec: Retry<Any>) {
+                                private val configurationStateListener: ConfigurationStateListener,
+                                private val mdc: MappedDiagnosticContext,
+                                retrySpec: Retry<Any>) {
 
     private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+    private val retry = retrySpec.doOnRetry {
+        logger.withWarn(mdc) {
+            log("Exception while creating CBS client, retrying. Reason: ${it.exception().localizedMessage}")
+        }
+        configurationStateListener.retrying()
+    }
 
-    fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+    fun configurationUpdates() = cbsClientMono
             .doOnNext {
                 logger.info(mdc) {
                     "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
                 }
             }
-            .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
-            .retryWhen(retry(mdc))
+            .retryWhen(retry)
             .delayElement(firstRequestDelay)
             .flatMapMany(::toPeriodicalConfigurations)
             .distinctUntilChanged()
 
-    fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
-        requestInterval.set(intervalUpdate)
+    fun updateCbsInterval(intervalUpdate: Duration) = requestInterval.set(intervalUpdate).also {
         logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
     }
 
@@ -67,15 +71,7 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
 
     private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
 
-    private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
-        logger.withWarn(mdc) {
-            log("Exception from HV-VES cbs client, retrying subscription", it.exception())
-        }
-        configurationStateListener.retrying()
-    }
-
     companion object {
         private val logger = Logger(CbsClientAdapter::class)
     }
-
 }
index 6efa38e..6f16b3d 100644 (file)
@@ -55,7 +55,7 @@ internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientA
     }
 
     operator fun invoke(): Flux<PartialConfiguration> =
-            cbsClientAdapter.configurationUpdates(mdc)
+            cbsClientAdapter.configurationUpdates()
                     .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
                     .map(::parseConfiguration)
                     .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
index 1b2dbc2..9303920 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.api
 
-import arrow.core.Option
-import com.google.gson.JsonParser
+import arrow.core.Some
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.reset
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
 import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.*
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.config.impl.mdc
 import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import reactor.core.publisher.Mono
 import reactor.test.StepVerifier
 import java.time.Duration
 
 
 internal object ConfigurationModuleIT : Spek({
-    describe("configuration module") {
-        val cbsClientMock = mock<CbsClient>()
-        val configStateListenerMock = mock<ConfigurationStateListener>()
-        val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock))
-        val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
-
-        given("sample configuration in file: $configPath") {
-            val arguments = arrayOf(
-                    "--configuration-file",
-                    configPath,
-                    "--health-check-api-port",
-                    "6062")
+    StepVerifier.setDefaultTimeout(Duration.ofSeconds(5))
+
+    describe("Configuration Module") {
+        val configStateListenerMock: ConfigurationStateListener = mock()
+        val cbsClientMono = Mono.fromSupplier(CbsClientMockSupplier)
+
+        val sut = ConfigurationModule(configStateListenerMock, cbsClientMono)
+
+        beforeEachTest {
+            reset(configStateListenerMock)
+            CbsClientMockSupplier.reset()
+        }
+
+        given("sample configuration in file") {
+            val configurationPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
+
+            val configurationUpdates = sut.hvVesConfigurationUpdates(arguments(configurationPath), mdc)
+
+            on("Config Binding Service permanently not available") {
+                CbsClientMockSupplier.setCbsClientCreationSuccessful(false)
+                val testVirtualDuration = Duration.ofMinutes(10)
+
+                it("should retry as long as possible until failing") {
+                    StepVerifier
+                            .withVirtualTime { configurationUpdates.last() }
+                            .expectSubscription()
+                            .expectNoEvent(testVirtualDuration)
+                            .thenCancel()
+                            .verifyThenAssertThat()
+                            .allOperatorErrorsAre(CbsClientMockSupplier.throwedException())
+                }
+
+                it("should notify configuration state listener about each retry") {
+                    val requestsAmount = CbsClientMockSupplier.requestsAmount.get()
+                    assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0)
+                    verify(configStateListenerMock, times(requestsAmount)).retrying()
+                }
+            }
+
+            on("Config Binding Service temporarily not available") {
+                CbsClientMockSupplier.setCbsClientCreationSuccessful(false)
+                val cbsUnavailabilityTime = Duration.ofMinutes(10)
+                whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
+                        .thenReturn(Mono.just(configurationJsonWithIntervalChanged))
+
+                it("should return configuration after CBS is available again") {
+                    StepVerifier
+                            .withVirtualTime { configurationUpdates.take(1) }
+                            .expectSubscription()
+                            .expectNoEvent(cbsUnavailabilityTime)
+                            .then { CbsClientMockSupplier.setCbsClientCreationSuccessful(true) }
+                            .thenAwait(MAX_BACKOFF_INTERVAL)
+                            .expectNext(configurationWithIntervalChanged)
+                            .verifyComplete()
+                }
+            }
+
+            on("failure from CBS client during getting configuration") {
+                val exceptionFromCbsClient = MyCustomTestCbsClientException("I'm such a failure")
+                whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
+                        .thenReturn(Mono.error(exceptionFromCbsClient))
+                val testVirtualDuration = Duration.ofMinutes(2)
+
+                it("should retry as long as possible until failing") {
+                    StepVerifier
+                            .withVirtualTime { configurationUpdates.last() }
+                            .expectSubscription()
+                            .expectNoEvent(testVirtualDuration)
+                            .thenCancel()
+                            .verifyThenAssertThat()
+                            .allOperatorErrorsAre(exceptionFromCbsClient)
+                }
+
+                it("should notify configuration state listener about each retry") {
+                    val requestsAmount = CbsClientMockSupplier.requestsAmount.get()
+                    assertThat(requestsAmount).describedAs("CBS client requests amount").isGreaterThan(0)
+                    verify(configStateListenerMock, times(requestsAmount)).retrying()
+                }
+            }
+
             on("configuration changes in Config Binding Service") {
-                whenever(cbsClientMock.get(any()))
+                whenever(CbsClientMockSupplier.cbsClientMock.get(any()))
                         .thenReturn(
                                 Mono.just(configurationJsonWithIntervalChanged),
                                 Mono.just(configurationJsonWithIntervalChangedAgain),
@@ -62,10 +130,7 @@ internal object ConfigurationModuleIT : Spek({
                 it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" +
                         " fetch configurations in intervals specified within them") {
                     StepVerifier
-                            .withVirtualTime {
-                                sut.hvVesConfigurationUpdates(arguments, sampleMdc)
-                                        .take(3)
-                            }
+                            .withVirtualTime { configurationUpdates.take(3) }
                             .expectSubscription()
                             .expectNoEvent(firstRequestDelayFromFile)
                             .expectNext(configurationWithIntervalChanged)
@@ -80,26 +145,34 @@ internal object ConfigurationModuleIT : Spek({
     }
 })
 
+private data class MyCustomTestCbsClientException(val msg: String) : Exception(msg)
+
+private val MAX_BACKOFF_INTERVAL = Duration.ofMinutes(5)
+
+fun StepVerifier.Assertions.allOperatorErrorsAre(ex: Throwable) = hasOperatorErrorsMatching {
+    it.all { tuple -> tuple.t1.get() === ex }
+}
+
+private fun arguments(configurationPath: String) = arrayOf(
+        "--configuration-file",
+        configurationPath,
+        "--health-check-api-port",
+        "6062")
+
 private val firstRequestDelayFromFile = Duration.ofSeconds(3)
 private val firstRequestDelayFromCBS = Duration.ofSeconds(999)
 private val requestIntervalFromCBS = Duration.ofSeconds(10)
 private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20)
 
-private val sampleMdc = { mapOf("k" to "v") }
-private val emptyRouting = listOf<Route>()
+private val configurationJsonWithIntervalChanged =
+        hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS))
 
-private val configurationJsonWithIntervalChanged = JsonParser().parse("""{
-    "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
-}""").asJsonObject
+private val configurationJsonWithIntervalChangedAgain =
+        hvVesConfigurationJson(requestInterval = Some(anotherRequestIntervalFromCBS),
+                firstRequestDelay = Some(firstRequestDelayFromCBS))
 
-private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{
-    "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds},
-    "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds}
-}""").asJsonObject
-
-private val configurationJsonWithIntervalRestored = JsonParser().parse("""{
-    "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
-}""").asJsonObject
+private val configurationJsonWithIntervalRestored =
+        hvVesConfigurationJson(requestInterval = Some(requestIntervalFromCBS))
 
 private val configurationWithIntervalChanged =
         hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
@@ -110,11 +183,4 @@ private val configurationWithIntervalChangedAgain =
 private val configurationWithIntervalRestored =
         hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
 
-private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration {
-    return HvVesConfiguration(
-            ServerConfiguration(6061, Duration.ofSeconds(60)),
-            CbsConfiguration(firstRequestDelay, requestInterval),
-            SecurityConfiguration(Option.empty()),
-            CollectorConfiguration(emptyRouting, 1024 * 1024),
-            LogLevel.DEBUG)
-}
\ No newline at end of file
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/cbs_stub.kt
new file mode 100644 (file)
index 0000000..2491264
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.reset
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.function.Supplier
+
+
+internal object CbsClientMockSupplier : Supplier<CbsClient> {
+
+    private val logger = Logger(CbsClientMockSupplier::class)
+    private val cbsClientSupplierException = Exception("Test was configured to fail at client creation.")
+
+    private var shouldEmitError = false
+    val requestsAmount = AtomicInteger(0)
+    val cbsClientMock: CbsClient = mock()
+
+    override fun get(): CbsClient = requestsAmount.incrementAndGet().let {
+        if (shouldEmitError) {
+            throw cbsClientSupplierException
+        } else {
+            cbsClientMock
+        }
+    }
+
+    fun setCbsClientCreationSuccessful(creationSuccessful: Boolean) {
+        logger.trace { "Setting CBS creation success result to : $creationSuccessful" }
+        shouldEmitError = !creationSuccessful
+    }
+
+    fun throwedException(): Throwable = cbsClientSupplierException
+
+    fun reset() {
+        reset(cbsClientMock)
+        setCbsClientCreationSuccessful(true)
+        this.requestsAmount.set(0)
+    }
+}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/test_configurations.kt
new file mode 100644 (file)
index 0000000..8472f3c
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
+import com.google.gson.JsonParser
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import java.time.Duration
+
+
+internal fun hvVesConfigurationJson(listenPort: Option<Int> = None,
+                                    idleTimeoutSec: Option<Int> = None,
+                                    firstRequestDelay: Option<Duration> = None,
+                                    requestInterval: Option<Duration> = None,
+                                    logLevel: Option<String> = None,
+                                    sslDisable: Option<Boolean> = None,
+                                    keyStoreFilePath: Option<String> = None,
+                                    keyStorePasswordFilePath: Option<String> = None,
+                                    trustStoreFilePath: Option<String> = None,
+                                    trustStorePasswordFilePath: Option<String> = None) = JsonParser().parse(
+        """{
+    ${addKeyIfPresent("logLevel", logLevel)}
+    ${addKeyIfPresent("server.listenPort", listenPort)}
+    ${addKeyIfPresent("server.idleTimeoutSec", idleTimeoutSec)}
+    ${addKeyIfPresent("cbs.firstRequestDelaySec", firstRequestDelay.map { it.seconds })}
+    ${addKeyIfPresent("cbs.requestIntervalSec", requestInterval.map { it.seconds })}
+    ${addKeyIfPresent("security.sslDisable", sslDisable)}
+    ${addKeyIfPresent("security.keys.keyStoreFile", keyStoreFilePath)}
+    ${addKeyIfPresent("security.keys.keyStorePasswordFile", keyStorePasswordFilePath)}
+    ${addKeyIfPresent("security.keys.trustStoreFile", trustStoreFilePath)}
+    ${addKeyIfPresent("security.keys.trustStorePasswordFile", trustStorePasswordFilePath)}
+""".trim().removeSuffix(",") + "}"
+).asJsonObject
+
+private fun <T> addKeyIfPresent(configurationKey: String, option: Option<T>) = option
+        .map { "$configurationKey: $it," }
+        .getOrElse { "" }
+
+
+private val emptyRouting = listOf<Route>()
+
+internal fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration) =
+        HvVesConfiguration(
+                ServerConfiguration(6061, Duration.ofSeconds(60)),
+                CbsConfiguration(firstRequestDelay, requestInterval),
+                SecurityConfiguration(Option.empty()),
+                CollectorConfiguration(emptyRouting, 1024 * 1024),
+                LogLevel.DEBUG)
+
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapterTest.kt
new file mode 100644 (file)
index 0000000..1f6a253
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.google.gson.JsonParser
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.time.Duration
+
+internal object CbsClientAdapterTest : Spek({
+
+    describe("Config Binding Service Client Adapter") {
+
+        val cbsClientMock: CbsClient = mock()
+        val configStateListener: ConfigurationStateListener = mock()
+
+        given("successful client creation") {
+            val cbsClientMono = Mono.just(cbsClientMock)
+            val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc, retry())
+
+            on("configurations stream in CBS") {
+                val firstConfigurationContent = "first"
+                val secondConfigurationContent = "second"
+                whenever(cbsClientMock.get(any())).thenReturn(
+                        configurationMono(firstConfigurationContent),
+                        configurationMono(secondConfigurationContent)
+                )
+
+                it("should return flux of fetched configurations") {
+                    StepVerifier
+                            .withVirtualTime {
+                                cut.configurationUpdates().take(2)
+                            }
+                            .expectSubscription()
+                            .expectNoEvent(firstRequestDelay)
+                            .expectNext(configuration(firstConfigurationContent))
+                            .expectNext(configuration(secondConfigurationContent))
+                            .verifyComplete()
+                }
+            }
+
+
+            on("exception from CBS client on configuration fetch") {
+
+                whenever(cbsClientMock.get(any())).thenReturn(
+                        Mono.error { sampleException }
+                )
+
+                it("should return error flux") {
+                    StepVerifier.create(cut.configurationUpdates())
+                            .expectErrorMatches { it === sampleException }
+                            .verify()
+                }
+            }
+        }
+
+        given("repeated failure during client creation") {
+            val failedCreationsAmount = 3
+            var currentFailuresCount = 0
+            val cbsClientMono = Mono.fromCallable {
+                currentFailuresCount++
+                if (currentFailuresCount <= failedCreationsAmount) {
+                    throw sampleException
+                } else {
+                    cbsClientMock
+                }
+            }
+
+            val cut = CbsClientAdapter(cbsClientMono, firstRequestDelay, configStateListener, mdc,
+                    retry(failedCreationsAmount + 1L))
+
+            on("CBS client creation") {
+                whenever(cbsClientMock.get(any())).thenReturn(configurationMono())
+
+                it("it should emit configuration after failures") {
+                    StepVerifier
+                            .withVirtualTime { cut.configurationUpdates().take(1) }
+                            .expectSubscription()
+                            .expectNoEvent(firstRequestDelay)
+                            .expectNext(configuration())
+                            .verifyComplete()
+                }
+
+                it("should call state listener when retrying") {
+                    verify(configStateListener, times(failedCreationsAmount)).retrying()
+                }
+            }
+        }
+    }
+})
+
+private val firstRequestDelay = Duration.ofSeconds(10)
+private val sampleException = Exception("Best regards from CBS")
+
+private fun configuration(content: String = "whatever") =
+        JsonParser().parse("""{ "content": ${content} }""").asJsonObject
+
+private fun configurationMono(content: String = "whatever") = Mono.just(configuration(content))
index 3141545..0954b76 100644 (file)
@@ -21,7 +21,6 @@ package org.onap.dcae.collectors.veshv.config.impl
 
 import arrow.core.Some
 import com.google.gson.JsonParser
-import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.times
 import com.nhaarman.mockitokotlin2.verify
@@ -49,11 +48,11 @@ internal object CbsConfigurationProviderTest : Spek({
 
     describe("Configuration provider") {
 
-        val cbsClientAdapter = mock<CbsClientAdapter>()
-        val configStateListener = mock<ConfigurationStateListener>()
+        val cbsClientAdapter: CbsClientAdapter = mock()
+        val configStateListener: ConfigurationStateListener = mock()
 
         given("configuration is never in cbs") {
-            val cbsClientMock = mock<CbsClient>()
+            val cbsClientMock: CbsClient = mock()
             val configProvider = constructConfigurationProvider(
                     constructCbsClientAdapter(cbsClientMock, configStateListener),
                     configStateListener
@@ -73,7 +72,7 @@ internal object CbsConfigurationProviderTest : Spek({
             val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
 
             on("new configuration") {
-                whenever(cbsClientAdapter.configurationUpdates(any()))
+                whenever(cbsClientAdapter.configurationUpdates())
                         .thenReturn(Flux.just(validConfiguration))
                 it("should use received configuration") {
 
@@ -110,7 +109,7 @@ internal object CbsConfigurationProviderTest : Spek({
             )
 
             on("new configuration") {
-                whenever(cbsClientAdapter.configurationUpdates(any()))
+                whenever(cbsClientAdapter.configurationUpdates())
                         .thenReturn(Flux.just(invalidConfiguration))
 
                 it("should interrupt the flux") {
@@ -193,21 +192,15 @@ private val invalidConfiguration = JsonParser().parse("""
 private val firstRequestDelay = Duration.ofMillis(1)
 private val configParser = JsonConfigurationParser()
 
-private fun retry(iterationCount: Long = 1) = Retry
-        .onlyIf<Any> { it.iteration() <= iterationCount }
-        .fixedBackoff(Duration.ofNanos(1))
-
 private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
-        CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry())
+        CbsClientAdapter(Mono.just(cbsClientMock), firstRequestDelay, configStateListener, mdc, retry())
 
 private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
                                            configurationStateListener: ConfigurationStateListener,
-                                           iterationCount: Long = 1
-): CbsConfigurationProvider =
-        CbsConfigurationProvider(
-                cbsClientAdapter,
-                configParser,
-                configurationStateListener,
-                { mapOf("k" to "v") },
-                retry(iterationCount)
-        )
+                                           iterationCount: Long = 1) = CbsConfigurationProvider(
+        cbsClientAdapter,
+        configParser,
+        configurationStateListener,
+        mdc,
+        retry(iterationCount)
+)
index f07af07..d2b56b6 100644 (file)
@@ -24,7 +24,9 @@ import com.nhaarman.mockitokotlin2.whenever
 import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.retry.Retry
 import java.nio.file.Paths
+import java.time.Duration
 
 private fun resourcePathAsString(resource: String) =
         Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
@@ -51,4 +53,11 @@ private val sampleSink = mock<KafkaSink>().also {
 }
 
 internal val sampleStreamsDefinition = listOf(sampleSink)
-internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file
+internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
+
+internal val mdc = { mapOf("mdc_key" to "mdc_value") }
+
+internal fun retry(iterationCount: Long = 1) = Retry
+        .onlyIf<Any> { it.iteration() <= iterationCount }
+        .fixedBackoff(Duration.ofNanos(1))
+