Add of adapter fun with param ClientContext
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / VesHvCollector.kt
index 0d07504..cf73aed 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -33,7 +34,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -56,7 +56,7 @@ internal class VesHvCollector(
                     .transform(::decodeProtobufPayload)
                     .transform(::filterInvalidProtobufMessages)
                     .transform(::routeMessage)
-                    .onErrorResume { logger.handleReactiveStreamError(clientContext::asMap, it) }
+                    .onErrorResume { logger.handleReactiveStreamError(clientContext, it) }
                     .doFinally { releaseBuffersMemory() }
                     .then()