Add Message tracing logger service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaBasicAuthMessageProducerService.kt
index 008e924..42adcd7 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.producer.Callback
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.SendResult
-import org.springframework.util.concurrent.ListenableFutureCallback
+import java.nio.charset.Charset
 
 class KafkaBasicAuthMessageProducerService(
         private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
@@ -34,42 +37,46 @@ class KafkaBasicAuthMessageProducerService(
 
     private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
 
-    private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+    private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+
+    private val messageLoggerService = MessageLoggerService()
 
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessage(messageProducerProperties.topic!!, message)
+        return sendMessageNB(messageProducerProperties.topic!!, message)
     }
 
-    override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
-        val serializedMessage = when (message) {
-            is String -> {
-                message
-            }
-            else -> {
-                message.asJsonType().toString()
-            }
-        }
-        val future = messageTemplate().send(topic, serializedMessage)
+    override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
+        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+        return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+    }
 
-        future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
-            override fun onSuccess(result: SendResult<String, Any>) {
-                log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
-            }
+    override suspend fun sendMessageNB(topic: String, message: Any,
+                                       headers: MutableMap<String, String>?): Boolean {
+        val byteArrayMessage = when (message) {
+            is String -> message.toByteArray(Charset.defaultCharset())
+            else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+        }
 
-            override fun onFailure(ex: Throwable) {
-                log.error("Unable to send message", ex)
-            }
-        })
+        val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        val recordHeaders = record.headers()
+        messageLoggerService.messageProducing(recordHeaders)
+        headers?.let {
+            headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
+        }
+        val callback = Callback { metadata, exception ->
+            log.info("message published offset(${metadata.offset()}, headers :$headers )")
+        }
+        messageTemplate().send(record, callback).get()
         return true
     }
 
-    private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
-        log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+    fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
+        log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
         val configProps = hashMapOf<String, Any>()
         configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
         configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
-        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
         if (messageProducerProperties.clientId != null) {
             configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
         }
@@ -79,15 +86,11 @@ class KafkaBasicAuthMessageProducerService(
         if (additionalConfig != null) {
             configProps.putAll(additionalConfig)
         }
-        return DefaultKafkaProducerFactory(configProps)
-    }
 
-    fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
-        log.info("Prepering templates")
-        if (kafkaTemplate == null) {
-            kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+        if (kafkaProducer == null) {
+            kafkaProducer = KafkaProducer(configProps)
         }
-        return kafkaTemplate!!
+        return kafkaProducer!!
     }
 }