Support CBS request interval reconfiguration 06/88306/18
authorkjaniak <kornel.janiak@nokia.com>
Wed, 22 May 2019 20:19:49 +0000 (22:19 +0200)
committerkjaniak <kornel.janiak@nokia.com>
Wed, 5 Jun 2019 14:01:22 +0000 (16:01 +0200)
Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48
Issue-ID: DCAEGEN2-1525
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt [new file with mode: 0644]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json [new file with mode: 0644]
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt

index ded7583..e243afe 100644 (file)
@@ -25,18 +25,24 @@ import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
+import org.onap.dcae.collectors.veshv.config.impl.CbsClientAdapter
 import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
 import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
 import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.time.Duration
 
-class ConfigurationModule {
+class ConfigurationModule internal constructor(private val configStateListener: ConfigurationStateListener,
+                                               private val cbsClient: Mono<CbsClient>) {
 
     private val cmd = HvVesCommandLineParser()
     private val configParser = JsonConfigurationParser()
@@ -44,10 +50,15 @@ class ConfigurationModule {
     private val configValidator = ConfigurationValidator()
     private val configTransformer = ConfigurationTransformer()
 
+    constructor(configStateListener: ConfigurationStateListener) : this(
+            configStateListener,
+            CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment())
+    )
+
+
     fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
 
     fun hvVesConfigurationUpdates(args: Array<String>,
-                                  configStateListener: ConfigurationStateListener,
                                   mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
             Mono.just(cmd.getConfigurationFile(args))
                     .throwOnLeft(::MissingArgumentException)
@@ -56,23 +67,35 @@ class ConfigurationModule {
                     .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
                     .cache()
                     .flatMapMany { basePartialConfig ->
-                        cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
-                                .invoke()
-                                .map { configMerger.merge(basePartialConfig, it) }
-                                .map(configValidator::validate)
-                                .throwOnLeft()
-                                .map(configTransformer::toFinalConfiguration)
+                        cbsClientAdapter(basePartialConfig).let { cbsClientAdapter ->
+                            cbsConfigurationProvider(cbsClientAdapter, mdc)
+                                    .invoke()
+                                    .map { configMerger.merge(basePartialConfig, it) }
+                                    .map(configValidator::validate)
+                                    .throwOnLeft()
+                                    .map(configTransformer::toFinalConfiguration)
+                                    .doOnNext {
+                                        cbsClientAdapter.updateCbsInterval(it.cbs.requestInterval, mdc)
+                                    }
+                        }
                     }
 
-    private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
-                                         configStateListener: ConfigurationStateListener,
+    private fun cbsClientAdapter(basePartialConfig: PartialConfiguration) =
+            CbsClientAdapter(
+                    cbsClient,
+                    configStateListener,
+                    cbsConfigurationFrom(basePartialConfig).firstRequestDelay,
+                    retrySpec
+            )
+
+    private fun cbsConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
                                          mdc: MappedDiagnosticContext) =
             CbsConfigurationProvider(
-                    CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
-                    cbsConfigurationFrom(basePartialConfig),
+                    cbsClientAdapter,
                     configParser,
                     configStateListener,
-                    mdc)
+                    mdc,
+                    retrySpec)
 
     private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
             configValidator.validatedCbsConfiguration(basePartialConfig)
@@ -80,6 +103,13 @@ class ConfigurationModule {
 
     companion object {
         private val logger = Logger(ConfigurationModule::class)
+
+        private const val MAX_RETRIES = 5L
+        private const val INITIAL_BACKOFF = 10L
+        private val retrySpec: Retry<Any> = Retry.any<Any>()
+                .retryMax(MAX_RETRIES)
+                .fixedBackoff(Duration.ofSeconds(INITIAL_BACKOFF))
+                .jitter(Jitter.random())
     }
 
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
new file mode 100644 (file)
index 0000000..d31f658
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+
+
+internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
+                                private val configurationStateListener: ConfigurationStateListener,
+                                private val firstRequestDelay: Duration,
+                                private val retrySpec: Retry<Any>) {
+
+    private val requestInterval = AtomicReference<Duration>(Duration.ZERO)
+
+    fun configurationUpdates(mdc: MappedDiagnosticContext) = cbsClientMono
+            .doOnNext {
+                logger.info(mdc) {
+                    "CBS client successfully created, first request will be sent in ${firstRequestDelay.seconds} s"
+                }
+            }
+            .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+            .retryWhen(retry(mdc))
+            .delayElement(firstRequestDelay)
+            .flatMapMany(::toPeriodicalConfigurations)
+            .distinctUntilChanged()
+
+    fun updateCbsInterval(intervalUpdate: Duration, mdc: MappedDiagnosticContext) {
+        requestInterval.set(intervalUpdate)
+        logger.debug(mdc) { "CBS request interval changed to: ${intervalUpdate.seconds} s" }
+    }
+
+    private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
+            Mono.just(configurationRequest())
+                    .repeat()
+                    .map(CbsRequest::withNewInvocationId)
+                    .flatMap(cbsClient::get)
+                    .transform(delayElements(requestInterval::get))
+
+    private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+
+    private fun retry(mdc: MappedDiagnosticContext) = retrySpec.doOnRetry {
+        logger.withWarn(mdc) {
+            log("Exception from HV-VES cbs client, retrying subscription", it.exception())
+        }
+        configurationStateListener.retrying()
+    }
+
+    companion object {
+        private val logger = Logger(CbsClientAdapter::class)
+    }
+
+}
index 4982c73..6efa38e 100644 (file)
@@ -22,56 +22,31 @@ package org.onap.dcae.collectors.veshv.config.impl
 import arrow.core.toOption
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcae.collectors.veshv.utils.reader
 import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
 import reactor.retry.Retry
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
-                                        private val cbsConfiguration: CbsConfiguration,
+internal class CbsConfigurationProvider(private val cbsClientAdapter: CbsClientAdapter,
                                         private val configParser: JsonConfigurationParser,
-                                        private val streamParser: StreamFromGsonParser<KafkaSink>,
                                         private val configurationStateListener: ConfigurationStateListener,
                                         private val mdc: MappedDiagnosticContext,
-                                        retrySpec: Retry<Any>
-
+                                        retrySpec: Retry<Any>,
+                                        private val streamParser: StreamFromGsonParser<KafkaSink> =
+                                                StreamFromGsonParsers.kafkaSinkParser()
 ) {
-    constructor(cbsClientMono: Mono<CbsClient>,
-                cbsConfig: CbsConfiguration,
-                configParser: JsonConfigurationParser,
-                configurationStateListener: ConfigurationStateListener,
-                mdc: MappedDiagnosticContext) :
-            this(
-                    cbsClientMono,
-                    cbsConfig,
-                    configParser,
-                    StreamFromGsonParsers.kafkaSinkParser(),
-                    configurationStateListener,
-                    mdc,
-                    Retry.any<Any>()
-                            .retryMax(MAX_RETRIES)
-                            .fixedBackoff(cbsConfig.requestInterval)
-                            .jitter(Jitter.random())
-            )
-
     private val retry = retrySpec.doOnRetry {
         logger.withWarn(mdc) {
             log("Exception from configuration provider client, retrying subscription", it.exception())
@@ -80,22 +55,12 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
     }
 
     operator fun invoke(): Flux<PartialConfiguration> =
-            cbsClientMono
-                    .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
-                    .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
+            cbsClientAdapter.configurationUpdates(mdc)
+                    .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
+                    .map(::parseConfiguration)
+                    .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
+                    .onErrorLog(logger, mdc) { "Error while creating configuration" }
                     .retryWhen(retry)
-                    .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
-                    .flatMapMany(::handleUpdates)
-
-    private fun handleUpdates(cbsClient: CbsClient) = cbsClient
-            .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
-                    cbsConfiguration.firstRequestDelay,
-                    cbsConfiguration.requestInterval)
-            .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
-            .map(::parseConfiguration)
-            .doOnNext { logger.info(mdc) { "Successfully parsed configuration json to:\n$it" } }
-            .onErrorLog(logger, mdc) { "Error while creating configuration" }
-            .retryWhen(retry)
 
     private fun parseConfiguration(json: JsonObject) =
             configParser
@@ -110,7 +75,6 @@ internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClien
                     .toList()
 
     companion object {
-        private const val MAX_RETRIES = 5L
         private val logger = Logger(CbsConfigurationProvider::class)
     }
 }
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModuleIT.kt
new file mode 100644 (file)
index 0000000..1b2dbc2
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.api
+
+import arrow.core.Option
+import com.google.gson.JsonParser
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.model.*
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.absoluteResourcePath
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.time.Duration
+
+
+internal object ConfigurationModuleIT : Spek({
+    describe("configuration module") {
+        val cbsClientMock = mock<CbsClient>()
+        val configStateListenerMock = mock<ConfigurationStateListener>()
+        val sut = ConfigurationModule(configStateListenerMock, Mono.just(cbsClientMock))
+        val configPath = javaClass.absoluteResourcePath("/insecureSampleConfig.json")
+
+        given("sample configuration in file: $configPath") {
+            val arguments = arrayOf(
+                    "--configuration-file",
+                    configPath,
+                    "--health-check-api-port",
+                    "6062")
+            on("configuration changes in Config Binding Service") {
+                whenever(cbsClientMock.get(any()))
+                        .thenReturn(
+                                Mono.just(configurationJsonWithIntervalChanged),
+                                Mono.just(configurationJsonWithIntervalChangedAgain),
+                                Mono.just(configurationJsonWithIntervalRestored)
+                        )
+                it("should wait $firstRequestDelayFromFile s as provided in configuration file and later" +
+                        " fetch configurations in intervals specified within them") {
+                    StepVerifier
+                            .withVirtualTime {
+                                sut.hvVesConfigurationUpdates(arguments, sampleMdc)
+                                        .take(3)
+                            }
+                            .expectSubscription()
+                            .expectNoEvent(firstRequestDelayFromFile)
+                            .expectNext(configurationWithIntervalChanged)
+                            .expectNoEvent(requestIntervalFromCBS)
+                            .expectNext(configurationWithIntervalChangedAgain)
+                            .expectNoEvent(anotherRequestIntervalFromCBS)
+                            .expectNext(configurationWithIntervalRestored)
+                            .verifyComplete()
+                }
+            }
+        }
+    }
+})
+
+private val firstRequestDelayFromFile = Duration.ofSeconds(3)
+private val firstRequestDelayFromCBS = Duration.ofSeconds(999)
+private val requestIntervalFromCBS = Duration.ofSeconds(10)
+private val anotherRequestIntervalFromCBS = Duration.ofSeconds(20)
+
+private val sampleMdc = { mapOf("k" to "v") }
+private val emptyRouting = listOf<Route>()
+
+private val configurationJsonWithIntervalChanged = JsonParser().parse("""{
+    "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationJsonWithIntervalChangedAgain = JsonParser().parse("""{
+    "cbs.firstRequestDelaySec": ${firstRequestDelayFromCBS.seconds},
+    "cbs.requestIntervalSec": ${anotherRequestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationJsonWithIntervalRestored = JsonParser().parse("""{
+    "cbs.requestIntervalSec": ${requestIntervalFromCBS.seconds}
+}""").asJsonObject
+
+private val configurationWithIntervalChanged =
+        hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
+
+private val configurationWithIntervalChangedAgain =
+        hvVesConfiguration(firstRequestDelayFromCBS, anotherRequestIntervalFromCBS)
+
+private val configurationWithIntervalRestored =
+        hvVesConfiguration(firstRequestDelayFromFile, requestIntervalFromCBS)
+
+private fun hvVesConfiguration(firstRequestDelay: Duration, requestInterval: Duration): HvVesConfiguration {
+    return HvVesConfiguration(
+            ServerConfiguration(6061, Duration.ofSeconds(60)),
+            CbsConfiguration(firstRequestDelay, requestInterval),
+            SecurityConfiguration(Option.empty()),
+            CollectorConfiguration(emptyRouting, 1024 * 1024),
+            LogLevel.DEBUG)
+}
\ No newline at end of file
index 8c3c22a..3141545 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.config.impl
 import arrow.core.Some
 import com.google.gson.JsonParser
 import com.nhaarman.mockitokotlin2.any
-import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.times
 import com.nhaarman.mockitokotlin2.verify
@@ -34,10 +33,8 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
@@ -52,12 +49,15 @@ internal object CbsConfigurationProviderTest : Spek({
 
     describe("Configuration provider") {
 
-        val cbsClient = mock<CbsClient>()
-        val cbsClientMock = Mono.just(cbsClient)
+        val cbsClientAdapter = mock<CbsClientAdapter>()
         val configStateListener = mock<ConfigurationStateListener>()
 
         given("configuration is never in cbs") {
-            val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+            val cbsClientMock = mock<CbsClient>()
+            val configProvider = constructConfigurationProvider(
+                    constructCbsClientAdapter(cbsClientMock, configStateListener),
+                    configStateListener
+            )
 
             on("waiting for configuration") {
                 val waitTime = Duration.ofMillis(100)
@@ -70,16 +70,16 @@ internal object CbsConfigurationProviderTest : Spek({
         }
 
         given("valid configuration from cbs") {
-            val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
+            val configProvider = constructConfigurationProvider(cbsClientAdapter, configStateListener)
 
             on("new configuration") {
-                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                whenever(cbsClientAdapter.configurationUpdates(any()))
                         .thenReturn(Flux.just(validConfiguration))
                 it("should use received configuration") {
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-
+                                assertThat(it.requestIntervalSec).isEqualTo(Some(5L))
                                 assertThat(it.listenPort).isEqualTo(Some(6061))
                                 assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
 
@@ -106,11 +106,11 @@ internal object CbsConfigurationProviderTest : Spek({
         given("invalid configuration from cbs") {
             val iterationCount = 3L
             val configProvider = constructConfigurationProvider(
-                    cbsClientMock, configStateListener, iterationCount
+                    cbsClientAdapter, configStateListener, iterationCount
             )
 
             on("new configuration") {
-                whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
+                whenever(cbsClientAdapter.configurationUpdates(any()))
                         .thenReturn(Flux.just(invalidConfiguration))
 
                 it("should interrupt the flux") {
@@ -146,6 +146,7 @@ private val validConfiguration = JsonParser().parse("""
 {
     "server.listenPort": 6061,
     "server.idleTimeoutSec": 60,
+    "cbs.requestIntervalSec": 5,
     "streams_publishes": {
         "$PERF3GPP_REGIONAL": {
             "type": "kafka",
@@ -190,26 +191,23 @@ private val invalidConfiguration = JsonParser().parse("""
 }""").asJsonObject
 
 private val firstRequestDelay = Duration.ofMillis(1)
-private val requestInterval = Duration.ofMillis(1)
-private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
 private val configParser = JsonConfigurationParser()
 
-private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
+private fun retry(iterationCount: Long = 1) = Retry
+        .onlyIf<Any> { it.iteration() <= iterationCount }
+        .fixedBackoff(Duration.ofNanos(1))
+
+private fun constructCbsClientAdapter(cbsClientMock: CbsClient, configStateListener: ConfigurationStateListener) =
+        CbsClientAdapter(Mono.just(cbsClientMock), configStateListener, firstRequestDelay, retry())
+
+private fun constructConfigurationProvider(cbsClientAdapter: CbsClientAdapter,
                                            configurationStateListener: ConfigurationStateListener,
                                            iterationCount: Long = 1
-): CbsConfigurationProvider {
-
-    val retry = Retry
-            .onlyIf<Any> { it.iteration() <= iterationCount }
-            .fixedBackoff(Duration.ofNanos(1))
-
-    return CbsConfigurationProvider(
-            cbsClientMono,
-            CbsConfiguration(firstRequestDelay, requestInterval),
-            configParser,
-            streamParser,
-            configurationStateListener,
-            { mapOf("k" to "v") },
-            retry
-    )
-}
+): CbsConfigurationProvider =
+        CbsConfigurationProvider(
+                cbsClientAdapter,
+                configParser,
+                configurationStateListener,
+                { mapOf("k" to "v") },
+                retry(iterationCount)
+        )
diff --git a/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json b/sources/hv-collector-configuration/src/test/resources/insecureSampleConfig.json
new file mode 100644 (file)
index 0000000..4fc5921
--- /dev/null
@@ -0,0 +1,8 @@
+{
+  "logLevel": "DEBUG",
+  "server.listenPort": 6061,
+  "server.idleTimeoutSec": 60,
+  "cbs.firstRequestDelaySec": 3,
+  "cbs.requestIntervalSec": 5,
+  "security.sslDisable": "true"
+}
\ No newline at end of file
index 123d2dc..3dcb5ce 100644 (file)
@@ -42,7 +42,6 @@ private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
 private val logger = Logger("$VES_HV_PACKAGE.main")
 
 private val hvVesServer = AtomicReference<ServerHandle>()
-private val configurationModule = ConfigurationModule()
 private val sslContextFactory = SslContextFactory()
 private val maxCloseTime = Duration.ofSeconds(10)
 
@@ -52,10 +51,10 @@ fun main(args: Array<String>) {
             HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
         }
     }
-
+    val configurationModule = ConfigurationModule(configStateListener)
     HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
     configurationModule
-            .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
+            .hvVesConfigurationUpdates(args, ServiceContext::mdc)
             .publishOn(Schedulers.single(Schedulers.elastic()))
             .doOnNext {
                 logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
index ceccbcb..e188605 100644 (file)
 package org.onap.dcae.collectors.veshv.utils.rx
 
 import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.toMono
+import java.time.Duration
 
 fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
         toMono().then(Mono.fromCallable(callback))
+
+fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
+    flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+}