Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSink.kt
index c4d6c87..fd08ba3 100644 (file)
@@ -20,6 +20,9 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -35,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
     private val sentMessages = AtomicLong(0)
 
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -45,17 +48,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
                     if (it.isSuccessful()) {
                         Mono.just(it)
                     } else {
-                        logger.warn(it.exception()) { "Failed to send message to Kafka" }
+                        logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
                         Mono.empty<SenderResult<RoutedMessage>>()
                     }
                 }
                 .map { it.correlationMetadata() }
 
-        return if (logger.traceEnabled) {
-            result.doOnNext(::logSentMessage)
-        } else {
-            result
-        }
+        return result.doOnNext(::logSentMessage)
     }
 
     private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -69,7 +68,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     }
 
     private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace {
+        logger.trace(ctx) {
             val msgNum = sentMessages.incrementAndGet()
             "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
         }