Move ConfigurationProvider to config module 98/83698/8
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 29 Mar 2019 10:22:24 +0000 (11:22 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 1 Apr 2019 10:32:42 +0000 (12:32 +0200)
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d
Issue-ID: DCAEGEN2-1347
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
23 files changed:
sources/hv-collector-configuration/pom.xml
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/adapters.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt with 84% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt with 96% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Exceptions.kt with 100% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/routing.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt with 100% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProvider.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt with 55% similarity]
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt [moved from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt with 97% similarity]
sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt [moved from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt with 85% similarity]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt [moved from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt with 68% similarity]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt [deleted file]
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt [deleted file]
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt

index 792b9ea..b6ec4ca 100644 (file)
             <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
             <artifactId>cbs-client</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-stdlib-jdk8</artifactId>
index dd1b171..9684484 100644 (file)
@@ -22,10 +22,16 @@ package org.onap.dcae.collectors.veshv.config.api
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
 import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
 import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
+import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
+import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
 import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 import reactor.core.publisher.Flux
 
 class ConfigurationModule {
@@ -34,16 +40,28 @@ class ConfigurationModule {
     private val configReader = FileConfigurationReader()
     private val configValidator = ConfigurationValidator()
 
-    private lateinit var initialConfig: HvVesConfiguration
-
     fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
 
-    fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
+    fun hvVesConfigurationUpdates(args: Array<String>,
+                                  configStateListener: ConfigurationStateListener,
+                                  mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
             Flux.just(cmd.getConfigurationFile(args))
                     .throwOnLeft { MissingArgumentException(it.message, it.cause) }
                     .map { it.reader().use(configReader::loadConfig) }
-                    .map { configValidator.validate(it) }
-                    .throwOnLeft { ValidationException(it.message) }
-                    .doOnNext { initialConfig = it }
+                    .cache()
+                    .flatMap { basePartialConfig ->
+                        val baseConfig = configValidator.validate(basePartialConfig)
+                                .rightOrThrow { ValidationException(it.message) }
+                        val cbsConfigProvider = CbsConfigurationProvider(
+                                CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+                                baseConfig.cbs,
+                                configStateListener,
+                                mdc)
+                        val merger = ConfigurationMerger()
+                        cbsConfigProvider()
+                                .map { merger.merge(basePartialConfig, it) }
+                                .map { configValidator.validate(it) }
+                                .throwOnLeft { ValidationException(it.message) }
+                    }
 
 }
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.api
 
-class ParsingException(message: String, cause: Throwable) : Exception(message, cause)
+interface ConfigurationStateListener {
+    fun retrying() {}
+}
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.impl
 
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
 import com.google.gson.JsonObject
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
@@ -42,69 +43,77 @@ import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Jitter
 import reactor.retry.Retry
-import java.time.Duration
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
-                                         private val firstRequestDelay: Duration,
-                                         private val requestInterval: Duration,
-                                         private val healthState: HealthState,
-                                         private val streamParser: StreamFromGsonParser<KafkaSink>,
-                                         retrySpec: Retry<Any>
+internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
+                                        private val cbsConfiguration: CbsConfiguration,
+                                        private val streamParser: StreamFromGsonParser<KafkaSink>,
+                                        private val configurationStateListener: ConfigurationStateListener,
+                                        retrySpec: Retry<Any>,
+                                        private val mdc: MappedDiagnosticContext
 
-) : ConfigurationProvider {
-    constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
-            cbsClientMono,
-            params.firstRequestDelay,
-            params.requestInterval,
-            HealthState.INSTANCE,
-            StreamFromGsonParsers.kafkaSinkParser(),
-            Retry.any<Any>()
-                    .retryMax(MAX_RETRIES)
-                    .fixedBackoff(params.requestInterval)
-                    .jitter(Jitter.random())
-    )
+) {
+    constructor(cbsClientMono: Mono<CbsClient>,
+                cbsConfig: CbsConfiguration,
+                configurationStateListener: ConfigurationStateListener,
+                mdc: MappedDiagnosticContext) :
+            this(
+                    cbsClientMono,
+                    cbsConfig,
+                    StreamFromGsonParsers.kafkaSinkParser(),
+                    configurationStateListener,
+                    Retry.any<Any>()
+                            .retryMax(MAX_RETRIES)
+                            .fixedBackoff(cbsConfig.requestInterval)
+                            .jitter(Jitter.random()),
+                    mdc
+            )
 
     private val retry = retrySpec.doOnRetry {
-        logger.withWarn(ServiceContext::mdc) {
+        logger.withWarn(mdc) {
             log("Exception from configuration provider client, retrying subscription", it.exception())
         }
-        healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+        configurationStateListener.retrying()
     }
 
-    override fun invoke(): Flux<Routing> =
+    operator fun invoke(): Flux<PartialConfiguration> =
             cbsClientMono
-                    .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
-                    .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
+                    .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
+                    .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
                     .retryWhen(retry)
-                    .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
+                    .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
                     .flatMapMany(::handleUpdates)
 
     private fun handleUpdates(cbsClient: CbsClient) = cbsClient
             .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
-                    firstRequestDelay,
-                    requestInterval)
-            .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
+                    cbsConfiguration.firstRequestDelay,
+                    cbsConfiguration.requestInterval)
+            .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
             .map(::createRoutingDescription)
-            .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
+            .onErrorLog(logger, mdc) { "Error while creating configuration" }
             .retryWhen(retry)
+            .map { PartialConfiguration(collector = Some(PartialCollectorConfig(routing = it))) }
 
-    private fun createRoutingDescription(configuration: JsonObject): Routing = try {
-        DataStreams.namedSinks(configuration)
+    private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
+        val routes = DataStreams.namedSinks(configuration)
                 .filter(streamOfType(KAFKA))
                 .map(streamParser::unsafeParse)
                 .map { Route(it.name(), it) }
                 .asIterable()
                 .toList()
+        Some(routes)
     } catch (e: NullPointerException) {
-        throw ParsingException("Failed to parse configuration", e)
+        logger.withWarn(mdc) {
+            log("Invalid streams configuration", e)
+        }
+        None
     }
 
     companion object {
         private const val MAX_RETRIES = 5L
-        private val logger = Logger(ConfigurationProviderImpl::class)
+        private val logger = Logger(CbsConfigurationProvider::class)
     }
 }
index 04bba7e..3e599b5 100644 (file)
@@ -63,9 +63,7 @@ internal class ConfigurationValidator {
                 securityConfiguration,
 // TOD0: swap when ConfigurationMerger is implemented
 //                    collectorConfiguration
-                CollectorConfiguration(-1,
-                        "I do not exist. I'm not even a URL :o",
-                        emptyList()),
+                CollectorConfiguration(emptyList()),
 // end TOD0
                 logLevel
         )
index 3e93a40..c1a9829 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.config.impl
 
-import arrow.core.*
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.CommandLineParser
 import org.apache.commons.cli.DefaultParser
@@ -54,6 +54,6 @@ internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
 
 internal data class PartialCollectorConfig(
         val maxRequestSizeBytes: Option<Int> = None,
-        val kafkaServers: Option<List<InetSocketAddress>> = None,
+        val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
         val routing: Option<Routing> = None
 )
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.config.impl
 
 import com.google.gson.JsonParser
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
 import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
@@ -30,13 +32,12 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
 import reactor.core.publisher.Flux
-
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
@@ -46,16 +47,16 @@ import java.time.Duration
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal object ConfigurationProviderImplTest : Spek({
+internal object CbsConfigurationProviderTest : Spek({
 
     describe("Configuration provider") {
 
         val cbsClient: CbsClient = mock()
         val cbsClientMock: Mono<CbsClient> = Mono.just(cbsClient)
-        val healthStateProvider = HealthState.INSTANCE
+        val configStateListener: ConfigurationStateListener = mock()
 
         given("configuration is never in cbs") {
-            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+            val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
 
             on("waiting for configuration") {
                 val waitTime = Duration.ofMillis(100)
@@ -68,7 +69,7 @@ internal object ConfigurationProviderImplTest : Spek({
         }
 
         given("valid configuration from cbs") {
-            val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
+            val configProvider = constructConfigurationProvider(cbsClientMock, configStateListener)
 
             on("new configuration") {
                 whenever(cbsClient.updates(any(), eq(firstRequestDelay), eq(requestInterval)))
@@ -77,8 +78,9 @@ internal object ConfigurationProviderImplTest : Spek({
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-                                val route1 = it.elementAt(0)
-                                val route2 = it.elementAt(1)
+                                val routes = it.collector.orNull()!!.routing.orNull()!!
+                                val route1 = routes.elementAt(0)
+                                val route2 = routes.elementAt(1)
                                 val receivedSink1 = route1.sink
                                 val receivedSink2 = route2.sink
 
@@ -102,7 +104,7 @@ internal object ConfigurationProviderImplTest : Spek({
         given("invalid configuration from cbs") {
             val iterationCount = 3L
             val configProvider = constructConfigurationProvider(
-                    cbsClientMock, healthStateProvider, iterationCount
+                    cbsClientMock, configStateListener, iterationCount
             )
 
             on("new configuration") {
@@ -114,11 +116,8 @@ internal object ConfigurationProviderImplTest : Spek({
                             .verifyError()
                 }
 
-                it("should update the health state") {
-                    StepVerifier.create(healthStateProvider().take(iterationCount))
-                            .expectNextCount(iterationCount - 1)
-                            .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
-                            .verifyComplete()
+                it("should call state listener when retrying") {
+                    verify(configStateListener, times(iterationCount.toInt())).retrying()
                 }
             }
         }
@@ -190,18 +189,18 @@ private val requestInterval = Duration.ofMillis(1)
 private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
 
 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
-                                           healthState: HealthState,
+                                           configurationStateListener: ConfigurationStateListener,
                                            iterationCount: Long = 1
-): ConfigurationProviderImpl {
+): CbsConfigurationProvider {
 
     val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
 
-    return ConfigurationProviderImpl(
+    return CbsConfigurationProvider(
             cbsClientMono,
-            firstRequestDelay,
-            requestInterval,
-            healthState,
+            CbsConfiguration(firstRequestDelay, requestInterval),
             streamParser,
-            retry
+            configurationStateListener,
+            retry,
+            { mapOf("k" to "v") }
     )
 }
index 1b92d90..e3156a0 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,8 +39,6 @@ interface SinkProvider : Closeable {
     operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
 }
 
-typealias ConfigurationProvider = () -> Flux<Routing>
-
 interface Metrics {
     fun notifyBytesReceived(size: Int)
     fun notifyMessageReceived(msg: WireFrameMessage)
index 5c64c70..ba0a9ee 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
-import arrow.core.Option
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -33,7 +32,7 @@ interface Collector {
 }
 
 interface CollectorProvider : Closeable {
-    operator fun invoke(ctx: ClientContext): Option<Collector>
+    operator fun invoke(ctx: ClientContext): Collector
 }
 
 interface Server {
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.factory
 
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -32,9 +28,4 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperti
  */
 object AdapterFactory {
     fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
-
-    fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
-            ConfigurationProviderImpl(
-                    CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
-                    config)
 }
index 2b29acd..1c79abd 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.factory
 
-import arrow.core.Option
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class CollectorFactory(private val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: CollectorConfiguration,
                        private val sinkProvider: SinkProvider,
                        private val metrics: Metrics,
-                       private val maxPayloadSizeBytes: Int,
-                       private val healthState: HealthState = HealthState.INSTANCE) {
+                       private val maxPayloadSizeBytes: Int) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val config = AtomicReference<Routing>()
-        configuration()
-                .doOnNext {
-                    logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
-                    healthState.changeState(HealthDescription.HEALTHY)
-                }
-                .doOnError {
-                    logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
-                    logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
-                    healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
-                }
-                .subscribe(config::set)
 
         return object : CollectorProvider {
-            override fun invoke(ctx: ClientContext): Option<Collector> =
-                    config.getOption().map { createVesHvCollector(it, ctx) }
+            override fun invoke(ctx: ClientContext): Collector =
+                    createVesHvCollector(ctx)
 
             override fun close() = sinkProvider.close()
         }
     }
 
-    private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+    private fun createVesHvCollector(ctx: ClientContext): Collector =
             VesHvCollector(
                     clientContext = ctx,
                     wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
-                    router = Router(routing, sinkProvider, ctx, metrics),
+                    router = Router(configuration.routing, sinkProvider, ctx, metrics),
                     metrics = metrics)
 
     companion object {
index fab9656..3e19414 100644 (file)
@@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
     private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
         metrics.notifyClientConnected()
         logger.info(clientContext::fullMdc) { "Handling new client connection" }
-        return collectorProvider(clientContext).fold(
-                {
-                    logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
-                    nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
-                },
-                handleClient(clientContext, nettyInbound)
-        )
+        val collector = collectorProvider(clientContext)
+        return collector.handleClient(clientContext, nettyInbound)
     }
 
-    private fun handleClient(clientContext: ClientContext,
-                             nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+    private fun Collector.handleClient(clientContext: ClientContext,
+                             nettyInbound: NettyInbound) =
         withConnectionFrom(nettyInbound) { connection ->
             connection
                     .configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
                     .logConnectionClosed(clientContext)
         }.run {
-            collector.handleConnection(nettyInbound.createDataStream())
+            handleConnection(nettyInbound.createDataStream())
         }
-    }
 
     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
             onReadIdle(timeout.toMillis()) {
index 61a9a35..35dfba8 100644 (file)
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
@@ -56,8 +57,7 @@ object PerformanceSpecification : Spek({
     describe("VES High Volume Collector performance") {
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicRouting)
+            val sut = Sut(CollectorConfiguration(basicRouting), sink)
 
             val numMessages: Long = 300_000
             val runs = 4
@@ -87,8 +87,7 @@ object PerformanceSpecification : Spek({
 
         it("should disconnect on transmission errors") {
             val sink = CountingSink()
-            val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicRouting)
+            val sut = Sut(CollectorConfiguration(basicRouting), sink)
 
             val numMessages: Long = 100_000
             val timeout = Duration.ofSeconds(30)
index ec54060..1217c47 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
-import arrow.core.getOrElse
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
@@ -27,6 +26,7 @@ import io.netty.buffer.UnpooledByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
@@ -34,8 +34,6 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
 import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
 import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
@@ -49,27 +47,22 @@ import java.util.concurrent.atomic.AtomicBoolean
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class Sut(sink: Sink = StoringSink()) : Closeable {
-    val configurationProvider = FakeConfigurationProvider()
-    val healthStateProvider = FakeHealthState()
+class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : Closeable {
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     val metrics = FakeMetrics()
     val sinkProvider = DummySinkProvider(sink)
 
     private val collectorFactory = CollectorFactory(
-            configurationProvider,
+            configuration,
             sinkProvider,
             metrics,
-            MAX_PAYLOAD_SIZE_BYTES,
-            healthStateProvider
+            MAX_PAYLOAD_SIZE_BYTES
     )
 
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
-        get() = collectorProvider(ClientContext(alloc)).getOrElse {
-            throw IllegalStateException("Collector not available.")
-        }
+        get() = collectorProvider(ClientContext(alloc))
 
 
     fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
@@ -107,16 +100,10 @@ class DummySinkProvider(private val sink: Sink) : SinkProvider {
 private val timeout = Duration.ofSeconds(10)
 
 fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
-        Sut(AlwaysSuccessfulSink()).apply {
-            configurationProvider.updateConfiguration(routing)
-        }
+        Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
 
 fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
-        Sut(AlwaysFailingSink()).apply {
-            configurationProvider.updateConfiguration(routing)
-        }
+        Sut(CollectorConfiguration(routing), AlwaysFailingSink())
 
 fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
-        Sut(DelayingSink(delay)).apply {
-            configurationProvider.updateConfiguration(routing)
-        }
+        Sut(CollectorConfiguration(routing), DelayingSink(delay))
index 5d215fc..6a718ee 100644 (file)
@@ -25,6 +25,8 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -166,9 +168,7 @@ object VesHvSpecification : Spek({
         }
 
         it("should be able to direct 2 messages from different domains to one topic") {
-            val (sut, sink) = vesHvWithStoringSink()
-
-            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+            val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
 
             val messages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP),
@@ -202,150 +202,6 @@ object VesHvSpecification : Spek({
         }
     }
 
-    describe("configuration update") {
-
-        val defaultTimeout = Duration.ofSeconds(10)
-
-        given("successful configuration change") {
-
-            lateinit var sut: Sut
-            lateinit var sink: StoringSink
-
-            beforeEachTest {
-                vesHvWithStoringSink().run {
-                    sut = first
-                    sink = second
-                }
-            }
-
-            it("should update collector") {
-                val firstCollector = sut.collector
-
-                sut.configurationProvider.updateConfiguration(alternativeRouting)
-                val collectorAfterUpdate = sut.collector
-
-                assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
-            }
-
-            it("should start routing messages") {
-
-                sut.configurationProvider.updateConfiguration(emptyRouting)
-
-                val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
-                assertThat(messages).isEmpty()
-
-                sut.configurationProvider.updateConfiguration(basicRouting)
-
-                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
-                assertThat(messagesAfterUpdate).hasSize(1)
-                val message = messagesAfterUpdate[0]
-
-                assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
-                        .isEqualTo(PERF3GPP_TOPIC)
-                assertThat(message.partition).describedAs("routed message partition")
-                        .isEqualTo(None)
-            }
-
-            it("should change domain routing") {
-
-                val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
-                assertThat(messages).hasSize(1)
-                val firstMessage = messages[0]
-
-                assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
-                        .isEqualTo(PERF3GPP_TOPIC)
-                assertThat(firstMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(None)
-
-
-                sut.configurationProvider.updateConfiguration(alternativeRouting)
-
-                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
-                assertThat(messagesAfterUpdate).hasSize(2)
-                val secondMessage = messagesAfterUpdate[1]
-
-                assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
-                        .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
-                assertThat(secondMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(None)
-            }
-
-            it("should update routing for each client sending one message") {
-
-                val messagesAmount = 10
-                val messagesForEachTopic = 5
-
-                Flux.range(0, messagesAmount).doOnNext {
-                    if (it == messagesForEachTopic) {
-                        sut.configurationProvider.updateConfiguration(alternativeRouting)
-                    }
-                }.doOnNext {
-                    sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
-                }.then().block(defaultTimeout)
-
-
-                val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
-                assertThat(messages.size).isEqualTo(messagesAmount)
-                assertThat(messagesForEachTopic)
-                        .describedAs("amount of messages routed to each topic")
-                        .isEqualTo(firstTopicMessagesCount)
-                        .isEqualTo(secondTopicMessagesCount)
-            }
-
-            it("should not update routing for client sending continuous stream of messages") {
-
-                val messageStreamSize = 10
-                val pivot = 5
-
-                val incomingMessages = Flux.range(0, messageStreamSize)
-                        .doOnNext {
-                            if (it == pivot) {
-                                sut.configurationProvider.updateConfiguration(alternativeRouting)
-                                println("config changed")
-                            }
-                        }
-                        .map { vesWireFrameMessage(PERF3GPP) }
-
-
-                sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
-
-                val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
-
-                assertThat(messages.size).isEqualTo(messageStreamSize)
-                assertThat(firstTopicMessagesCount)
-                        .describedAs("amount of messages routed to first topic")
-                        .isEqualTo(messageStreamSize)
-
-                assertThat(secondTopicMessagesCount)
-                        .describedAs("amount of messages routed to second topic")
-                        .isEqualTo(0)
-            }
-
-            it("should mark the application healthy") {
-                assertThat(sut.healthStateProvider.currentHealth)
-                        .describedAs("application health state")
-                        .isEqualTo(HealthDescription.HEALTHY)
-            }
-        }
-
-        given("failed configuration change") {
-            val (sut, _) = vesHvWithStoringSink()
-            sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
-            sut.configurationProvider.updateConfiguration(basicRouting)
-
-            it("should mark the application unhealthy ") {
-                assertThat(sut.healthStateProvider.currentHealth)
-                        .describedAs("application health state")
-                        .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
-            }
-        }
-    }
-
     describe("request validation") {
         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
             val (sut, sink) = vesHvWithStoringSink()
@@ -362,9 +218,8 @@ object VesHvSpecification : Spek({
 
 })
 
-private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
     val sink = StoringSink()
-    val sut = Sut(sink)
-    sut.configurationProvider.updateConfiguration(basicRouting)
+    val sut = Sut(CollectorConfiguration(routing), sink)
     return Pair(sut, sink)
 }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
deleted file mode 100644 (file)
index c25771b..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import reactor.core.publisher.Flux
-
-class FakeHealthState : HealthState {
-
-    lateinit var currentHealth: HealthDescription
-
-    override fun changeState(healthDescription: HealthDescription) {
-        currentHealth = healthDescription
-    }
-
-    override fun invoke(): Flux<HealthDescription> {
-        throw NotImplementedError()
-    }
-}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
deleted file mode 100644 (file)
index c465fd9..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.tests.fakes
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import reactor.core.publisher.FluxProcessor
-import reactor.core.publisher.UnicastProcessor
-import reactor.retry.RetryExhaustedException
-
-
-class FakeConfigurationProvider : ConfigurationProvider {
-    private var shouldThrowException = false
-    private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
-
-    fun updateConfiguration(routing: Routing) =
-            if (shouldThrowException) {
-                configStream.onError(RetryExhaustedException("I'm so tired"))
-            } else {
-                configStream.onNext(routing)
-            }
-
-
-    fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
-        this.shouldThrowException = shouldThrowException
-    }
-
-    override fun invoke() = configStream
-}
index 059e802..22d8000 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.main
 
 import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -41,10 +42,25 @@ private val hvVesServer = AtomicReference<ServerHandle>()
 private val configurationModule = ConfigurationModule()
 
 fun main(args: Array<String>) {
+    val configStateListener = object : ConfigurationStateListener {
+        override fun retrying() {
+            HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+        }
+    }
+
     HealthCheckServer.start(configurationModule.healthCheckPort(args))
     configurationModule
-            .hvVesConfigurationUpdates(args)
+            .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
             .publishOn(Schedulers.single(Schedulers.elastic()))
+            .doOnNext {
+                logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
+                HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
+            }
+            .doOnError {
+                logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+                logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
+                HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
+            }
             .doOnNext(::startServer)
             .doOnError(::logServerStartFailed)
             .neverComplete() // TODO: remove after merging configuration stream with cbs
index aed4d92..c079cc5 100644 (file)
@@ -23,8 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.factory.AdapterFactory
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -59,11 +58,10 @@ object VesServer {
 
     private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
             CollectorFactory(
-                    AdapterFactory.configurationProvider(config.cbs),
+                    config.collector,
                     AdapterFactory.sinkCreatorFactory(),
                     MicrometerMetrics.INSTANCE,
-                    config.server.maxPayloadSizeBytes,
-                    HealthState.INSTANCE
+                    config.server.maxPayloadSizeBytes
             )
 
     private fun logServerStarted(handle: ServerHandle) =