Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / factory / CollectorFactory.kt
index 5c96e1c..2008fc3 100644 (file)
@@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val collector: AtomicReference<Collector> = AtomicReference()
+        val config: AtomicReference<CollectorConfiguration> = AtomicReference()
         configuration()
-                .map(this::createVesHvCollector)
                 .doOnNext {
-                    logger.info("Using updated configuration for new connections")
+                    logger.info { "Using updated configuration for new connections" }
                     healthState.changeState(HealthDescription.HEALTHY)
                 }
                 .doOnError {
-                    logger.error("Failed to acquire configuration from consul")
+                    logger.error { "Failed to acquire configuration from consul" }
                     healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
-                .subscribe(collector::set)
-        return collector::getOption
+                .subscribe(config::set)
+        return { ctx: ClientContext ->
+            config.getOption().map { config -> createVesHvCollector(config, ctx) }
+        }
     }
 
-    private fun createVesHvCollector(config: CollectorConfiguration): Collector {
-        return VesHvCollector(
-                wireChunkDecoderSupplier = { alloc ->
-                    WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
-                },
-                protobufDecoder = VesDecoder(),
-                router = Router(config.routing),
-                sink = sinkProvider(config),
-                metrics = metrics)
-    }
+    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+            clientContext = ctx,
+            wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+            protobufDecoder = VesDecoder(),
+            router = Router(config.routing, ctx),
+            sink = sinkProvider(config, ctx),
+            metrics = metrics)
 
     companion object {
         private val logger = Logger(CollectorFactory::class)