Add Message tracing logger service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaBasicAuthMessageProducerService.kt
index 86c04f6..42adcd7 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -38,6 +39,8 @@ class KafkaBasicAuthMessageProducerService(
 
     private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
 
+    private val messageLoggerService = MessageLoggerService()
+
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
         return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -56,8 +59,10 @@ class KafkaBasicAuthMessageProducerService(
         }
 
         val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        val recordHeaders = record.headers()
+        messageLoggerService.messageProducing(recordHeaders)
         headers?.let {
-            headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+            headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
         val callback = Callback { metadata, exception ->
             log.info("message published offset(${metadata.offset()}, headers :$headers )")