Add of adapter fun with param ClientContext
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireChunkDecoder.kt
index 349b078..b735138 100644 (file)
@@ -27,10 +27,11 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Flux.defer
 import reactor.core.publisher.SynchronousSink
 
 /**
@@ -46,7 +47,7 @@ internal class WireChunkDecoder(
         streamBuffer.release()
     }
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
+    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer {
         logIncomingMessage(byteBuf)
         if (byteBuf.readableBytes() == 0) {
             byteBuf.release()
@@ -54,7 +55,7 @@ internal class WireChunkDecoder(
         } else {
             streamBuffer.addComponent(true, byteBuf)
             generateFrames()
-                    .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
+                    .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) }
                     .doFinally { streamBuffer.discardReadComponents() }
         }
     }