import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
+ val config: AtomicReference<CollectorConfiguration> = AtomicReference()
configuration()
- .map(this::createVesHvCollector)
.doOnNext {
- logger.info("Using updated configuration for new connections")
+ logger.info { "Using updated configuration for new connections" }
healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error("Failed to acquire configuration from consul")
+ logger.error { "Failed to acquire configuration from consul" }
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
- .subscribe(collector::set)
- return collector::getOption
+ .subscribe(config::set)
+ return { ctx: ClientContext ->
+ config.getOption().map { config -> createVesHvCollector(config, ctx) }
+ }
}
- private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return VesHvCollector(
- wireChunkDecoderSupplier = { alloc ->
- WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
- },
- protobufDecoder = VesDecoder(),
- router = Router(config.routing),
- sink = sinkProvider(config),
- metrics = metrics)
- }
+ private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+ clientContext = ctx,
+ wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing, ctx),
+ sink = sinkProvider(config, ctx),
+ metrics = metrics)
companion object {
private val logger = Logger(CollectorFactory::class)