Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / VesHvCollector.kt
index 4176de9..0d07504 100644 (file)
@@ -21,18 +21,18 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Either
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
  * @since May 2018
  */
 internal class VesHvCollector(
-        private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+        private val clientContext: ClientContext,
+        private val wireChunkDecoder: WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
         private val router: Router,
         private val sink: Sink,
         private val metrics: Metrics) : Collector {
 
-    override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
-            wireChunkDecoderSupplier(alloc).let { wireDecoder ->
-                dataStream
-                        .transform { decodeWireFrame(it, wireDecoder) }
-                        .transform(::filterInvalidWireFrame)
-                        .transform(::decodeProtobufPayload)
-                        .transform(::filterInvalidProtobufMessages)
-                        .transform(::routeMessage)
-                        .onErrorResume { logger.handleReactiveStreamError(it) }
-                        .doFinally { releaseBuffersMemory(wireDecoder) }
-                        .then()
-            }
+    override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+            dataStream
+                    .transform { decodeWireFrame(it) }
+                    .transform(::filterInvalidWireFrame)
+                    .transform(::decodeProtobufPayload)
+                    .transform(::filterInvalidProtobufMessages)
+                    .transform(::routeMessage)
+                    .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+                    .doFinally { releaseBuffersMemory() }
+                    .then()
 
-    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+    private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
-            .concatMap(decoder::decode)
+            .concatMap(wireChunkDecoder::decode)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
-            .filterFailedWithLog(logger,
+            .filterFailedWithLog(logger, clientContext::asMap,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
@@ -89,15 +88,15 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
-            .filterEmptyWithLog(logger,
+            .filterEmptyWithLog(logger, clientContext::asMap,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-            .also { logger.debug("Released buffer memory after handling message stream") }
+    private fun releaseBuffersMemory() = wireChunkDecoder.release()
+            .also { logger.debug { "Released buffer memory after handling message stream" } }
 
     fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
-            filterFailedWithLog(logger, predicate)
+            filterFailedWithLog(logger, clientContext::asMap, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)