Add Message Header support for tracing. 58/97658/1
authorBrinda Santh <bs2796@att.com>
Mon, 28 Oct 2019 17:59:29 +0000 (13:59 -0400)
committerBrinda Santh <bs2796@att.com>
Mon, 28 Oct 2019 17:59:29 +0000 (13:59 -0400)
Issue-ID: CCSDK-1046
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: If6a416fe157a1a8ed4d93878142b3c959d49a035

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt

index 1cd8a2a..0b899f2 100644 (file)
@@ -35,6 +35,8 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
     lateinit var groupId: String
     var clientId: String? = null
     var topic: String? = null
+    var autoCommit: Boolean = true
+    var autoOffsetReset: String = "latest"
     var pollMillSec: Long = 1000
     var pollRecords: Int = -1
 }
index e33d41c..7d81386 100644 (file)
@@ -20,15 +20,31 @@ import kotlinx.coroutines.runBlocking
 
 interface BlueprintMessageProducerService {
 
-    fun sendMessage(message: Any): Boolean = runBlocking {
-        sendMessageNB(message)
+    fun sendMessage(message: Any): Boolean {
+        return sendMessage(message = message, headers = null)
     }
 
-    fun sendMessage(topic: String, message: Any): Boolean = runBlocking {
-        sendMessageNB(topic, message)
+    fun sendMessage(topic: String, message: Any): Boolean {
+        return sendMessage(topic, message, null)
     }
 
-    suspend fun sendMessageNB(message: Any): Boolean
+    fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+        sendMessageNB(message = message, headers = headers)
+    }
+
+    fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
+        sendMessageNB(topic, message, headers)
+    }
+
+    suspend fun sendMessageNB(message: Any): Boolean {
+        return sendMessageNB(message = message, headers = null)
+    }
+
+    suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
+
+    suspend fun sendMessageNB(topic: String, message: Any): Boolean {
+        return sendMessageNB(topic, message, null)
+    }
 
-    suspend fun sendMessageNB(topic: String, message: Any): Boolean
+    suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
 }
\ No newline at end of file
index b5d444a..b99be0a 100644 (file)
@@ -24,30 +24,37 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.nio.charset.Charset
 import java.time.Duration
 import kotlin.concurrent.thread
 
-class KafkaBasicAuthMessageConsumerService(
+open class KafkaBasicAuthMessageConsumerService(
         private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
     : BlueprintMessageConsumerService {
 
-    private val channel = Channel<String>()
-    private var kafkaConsumer: Consumer<String, String>? = null
+    val channel = Channel<String>()
+    var kafkaConsumer: Consumer<String, ByteArray>? = null
     val log = logger(KafkaBasicAuthMessageConsumerService::class)
 
     @Volatile
     var keepGoing = true
 
-    fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+    fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
         val configProperties = hashMapOf<String, Any>()
         configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
-        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+        configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
+        /**
+         * earliest: automatically reset the offset to the earliest offset
+         * latest: automatically reset the offset to the latest offset
+         */
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
-        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
         if (messageConsumerProperties.clientId != null) {
             configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
         }
@@ -95,7 +102,7 @@ class KafkaBasicAuthMessageConsumerService(
                             consumerRecord.value()?.let {
                                 launch {
                                     if (!channel.isClosedForSend) {
-                                        channel.send(it)
+                                        channel.send(String(it, Charset.defaultCharset()))
                                     } else {
                                         log.error("Channel is closed to receive message")
                                     }
index 1c93bb0..86c04f6 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.producer.Callback
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.SendResult
-import org.springframework.util.concurrent.ListenableFutureCallback
+import java.nio.charset.Charset
 
 class KafkaBasicAuthMessageProducerService(
         private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
@@ -34,42 +36,42 @@ class KafkaBasicAuthMessageProducerService(
 
     private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
 
-    private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+    private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
 
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessage(messageProducerProperties.topic!!, message)
+        return sendMessageNB(messageProducerProperties.topic!!, message)
     }
 
-    override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
-        val serializedMessage = when (message) {
-            is String -> {
-                message
-            }
-            else -> {
-                message.asJsonType().toString()
-            }
-        }
-        val future = messageTemplate().send(topic, serializedMessage)
+    override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
+        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+        return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+    }
 
-        future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
-            override fun onSuccess(result: SendResult<String, Any>) {
-                log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
-            }
+    override suspend fun sendMessageNB(topic: String, message: Any,
+                                       headers: MutableMap<String, String>?): Boolean {
+        val byteArrayMessage = when (message) {
+            is String -> message.toByteArray(Charset.defaultCharset())
+            else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+        }
 
-            override fun onFailure(ex: Throwable) {
-                log.error("Unable to send message", ex)
-            }
-        })
+        val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        headers?.let {
+            headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+        }
+        val callback = Callback { metadata, exception ->
+            log.info("message published offset(${metadata.offset()}, headers :$headers )")
+        }
+        messageTemplate().send(record, callback).get()
         return true
     }
 
-    private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
-        log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+    fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
+        log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
         val configProps = hashMapOf<String, Any>()
         configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
         configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
-        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
         if (messageProducerProperties.clientId != null) {
             configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
         }
@@ -79,14 +81,11 @@ class KafkaBasicAuthMessageProducerService(
         if (additionalConfig != null) {
             configProps.putAll(additionalConfig)
         }
-        return DefaultKafkaProducerFactory(configProps)
-    }
 
-    fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
-        if (kafkaTemplate == null) {
-            kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+        if (kafkaProducer == null) {
+            kafkaProducer = KafkaProducer(configProps)
         }
-        return kafkaTemplate!!
+        return kafkaProducer!!
     }
 }
 
index f4e85a9..86c2ec5 100644 (file)
@@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest {
                 partitionsEndMap[partition] = records
                 topicsCollection.add(partition.topic())
             }
-            val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+            val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
             mockKafkaConsumer.subscribe(topicsCollection)
             mockKafkaConsumer.rebalance(partitions)
             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
             for (i in 1..10) {
-                val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
-                        "I am message $i")
+                val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+                        "I am message $i".toByteArray())
                 mockKafkaConsumer.addRecord(record)
             }
 
@@ -131,7 +131,10 @@ open class BlueprintMessageConsumerServiceTest {
             launch {
                 repeat(5) {
                     delay(100)
-                    blueprintMessageProducerService.sendMessage("this is my message($it)")
+                    val headers: MutableMap<String, String> = hashMapOf()
+                    headers["id"] = it.toString()
+                    blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+                            headers = headers)
                 }
             }
             delay(5000)
index 31bcc15..f23624f 100644 (file)
@@ -20,18 +20,18 @@ import io.mockk.every
 import io.mockk.mockk
 import io.mockk.spyk
 import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.RecordMetadata
 import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.SendResult
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
-import org.springframework.util.concurrent.SettableListenableFuture
+import java.util.concurrent.Future
 import kotlin.test.Test
 import kotlin.test.assertTrue
 
@@ -57,12 +57,12 @@ open class BlueprintMessageProducerServiceTest {
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
                     .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
 
-            val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
+            val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
 
-            val future = SettableListenableFuture<SendResult<String, Any>>()
-            //future.setException(BluePrintException("failed sending"))
+            val responseMock = mockk<Future<RecordMetadata>>()
+            every { responseMock.get() } returns mockk()
 
-            every { mockKafkaTemplate.send(any(), any()) } returns future
+            every { mockKafkaTemplate.send(any(), any()) } returns responseMock
 
             val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)