Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireChunkDecoder.kt
index 4a2ef6b..349b078 100644 (file)
@@ -21,12 +21,13 @@ package org.onap.dcae.collectors.veshv.impl.wire
 
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
@@ -38,8 +39,8 @@ import reactor.core.publisher.SynchronousSink
  */
 internal class WireChunkDecoder(
         private val decoder: WireFrameDecoder,
-        alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
-    private val streamBuffer = alloc.compositeBuffer()
+        private val ctx: ClientContext) {
+    private val streamBuffer = ctx.alloc.compositeBuffer()
 
     fun release() {
         streamBuffer.release()
@@ -53,7 +54,7 @@ internal class WireChunkDecoder(
         } else {
             streamBuffer.addComponent(true, byteBuf)
             generateFrames()
-                    .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+                    .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
                     .doFinally { streamBuffer.discardReadComponents() }
         }
     }
@@ -84,15 +85,15 @@ internal class WireChunkDecoder(
     }
 
     private fun logIncomingMessage(wire: ByteBuf) {
-        logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+        logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
     }
 
     private fun logDecodedWireMessage(wire: WireFrameMessage) {
-        logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+        logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
     }
 
     private fun logEndOfData() {
-        logger.trace { "End of data in current TCP buffer" }
+        logger.trace(ctx) { "End of data in current TCP buffer" }
     }
 
     companion object {