Improve Kafka Producer callback messages 09/110709/2
authorJulien Fontaine <julien.fontaine@bell.ca>
Wed, 29 Jul 2020 23:06:00 +0000 (19:06 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Wed, 29 Jul 2020 23:12:30 +0000 (19:12 -0400)
Check if message sent is a BP request/response to print CBA name and version in the callback message.

Issue-ID: CCSDK-2623
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I526a01299a3bfca3d1c4a10ae19dead46393a9ae

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt

index 8de1f05..8958d4f 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
@@ -79,8 +80,18 @@ class KafkaMessageProducerService(
             headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
         val callback = Callback { metadata, exception ->
-            if (exception == null) log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers")
-            else log.error("ERROR : ${exception.message}")
+            if (exception != null)
+                log.error("ERROR : ${exception.message}")
+            else {
+                var logMessage = when (clonedMessage) {
+                    is ExecutionServiceInput ->
+                        "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
+                    is ExecutionServiceOutput ->
+                        "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
+                    else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers"
+                }
+                log.info(logMessage)
+            }
         }
         messageTemplate().send(record, callback)
         return true