Add Markers logs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSink.kt
index fd08ba3..07ce760 100644 (file)
@@ -21,8 +21,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicLong
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+                         private val ctx: ClientContext) : Sink {
     private val sentMessages = AtomicLong(0)
 
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -68,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     }
 
     private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace(ctx) {
+        logger.trace(ctx::asMap, Marker.INVOKE) {
             val msgNum = sentMessages.incrementAndGet()
             "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
         }