Make use of Kafka Key for Audit service and Kafka listener 93/110893/4
authorJulien Fontaine <julien.fontaine@bell.ca>
Tue, 4 Aug 2020 15:57:56 +0000 (11:57 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Tue, 4 Aug 2020 22:24:12 +0000 (18:24 -0400)
* When message is sent by audit service, key will be the  CBA name
* When sent by kafka listener (self-service api), key is the same as the request message key consumed. If not specified, a random UUID
* MessageProducer interface refactoring :
  * add 'key' parameter to specify a key
  * add default value null to paramater 'headers' to remove some unnecessary method

Issue-ID: CCSDK-2628
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I68580151184c87104c07037f379276dd8c8c71c7

ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt

index 7e6bf68..af8d902 100644 (file)
@@ -260,6 +260,7 @@ open class MessagePrioritizationConsumerTest {
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.id
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = it.asJsonString(false),
                         headers = headers
                     )
@@ -272,6 +273,7 @@ open class MessagePrioritizationConsumerTest {
                         val headers: MutableMap<String, String> = hashMapOf()
                         headers["id"] = it.id
                         blueprintMessageProducerService.sendMessageNB(
+                            key = "mykey",
                             message = it.asJsonString(false),
                             headers = headers
                         )
@@ -284,6 +286,7 @@ open class MessagePrioritizationConsumerTest {
                         val headers: MutableMap<String, String> = hashMapOf()
                         headers["id"] = it.id
                         blueprintMessageProducerService.sendMessageNB(
+                            key = "mykey",
                             message = it.asJsonString(false),
                             headers = headers
                         )
index f74abcd..311d35c 100644 (file)
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
 import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.ConsumerRecords
 import org.apache.kafka.streams.Topology
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -29,15 +30,15 @@ interface ConsumerFunction
 
 interface BlueprintMessageConsumerService {
 
-    suspend fun subscribe(): Channel<String> {
+    suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> {
         return subscribe(null)
     }
 
     /** Subscribe to the Kafka channel with [additionalConfig] */
-    suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+    suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>>
 
     /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
-    suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+    suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>>
 
     /** Consume and execute dynamic function [consumerFunction] */
     suspend fun consume(consumerFunction: ConsumerFunction) {
index cdc65a1..66d3a5b 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.runBlocking
+import java.util.UUID
 
 interface BlueprintMessageProducerService {
 
-    fun sendMessage(message: Any): Boolean {
-        return sendMessage(message = message, headers = null)
+    fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+        sendMessageNB(key, message, headers)
     }
 
-    fun sendMessage(topic: String, message: Any): Boolean {
-        return sendMessage(topic, message, null)
+    fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+        sendMessageNB(key, topic, message, headers)
     }
 
-    fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
-        sendMessageNB(message = message, headers = headers)
-    }
-
-    fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
-        sendMessageNB(topic, message, headers)
-    }
-
-    suspend fun sendMessageNB(message: Any): Boolean {
-        return sendMessageNB(message = message, headers = null)
-    }
-
-    suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
-
-    suspend fun sendMessageNB(topic: String, message: Any): Boolean {
-        return sendMessageNB(topic, message, null)
-    }
+    suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
 
-    suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
+    suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
 }
index cdcd419..a0932e9 100644 (file)
@@ -19,13 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import java.nio.charset.Charset
 import java.time.Duration
 import kotlin.concurrent.thread
 
@@ -35,7 +34,7 @@ open class KafkaMessageConsumerService(
     BlueprintMessageConsumerService {
 
     val log = logger(KafkaMessageConsumerService::class)
-    val channel = Channel<String>()
+    val channel = Channel<ConsumerRecord<String, ByteArray>>()
     var kafkaConsumer: Consumer<String, ByteArray>? = null
 
     @Volatile
@@ -49,14 +48,14 @@ open class KafkaMessageConsumerService(
         return KafkaConsumer(configProperties)
     }
 
-    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         /** get to topic names */
         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
         return subscribe(consumerTopic, additionalConfig)
     }
 
-    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         /** Create Kafka consumer */
         kafkaConsumer = kafkaConsumer(additionalConfig)
 
@@ -78,14 +77,10 @@ open class KafkaMessageConsumerService(
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             /** execute the command block */
-                            consumerRecord.value()?.let {
-                                launch {
-                                    if (!channel.isClosedForSend) {
-                                        channel.send(String(it, Charset.defaultCharset()))
-                                    } else {
-                                        log.error("Channel is closed to receive message")
-                                    }
-                                }
+                            if (!channel.isClosedForSend) {
+                                channel.send(consumerRecord)
+                            } else {
+                                log.error("Channel is closed to receive message")
                             }
                         }
                     }
index 8958d4f..59e9192 100644 (file)
@@ -29,7 +29,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
 import java.nio.charset.Charset
 
@@ -48,17 +47,13 @@ class KafkaMessageProducerService(
         const val MAX_ERR_MSG_LEN = 128
     }
 
-    override suspend fun sendMessageNB(message: Any): Boolean {
+    override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessageNB(messageProducerProperties.topic!!, message)
-    }
-
-    override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
-        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+        return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
     }
 
     override suspend fun sendMessageNB(
+        key: String,
         topic: String,
         message: Any,
         headers: MutableMap<String, String>?
@@ -73,7 +68,7 @@ class KafkaMessageProducerService(
             else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
         }
 
-        val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
         val recordHeaders = record.headers()
         messageLoggerService.messageProducing(recordHeaders)
         headers?.let {
index 60f2dfa..4340e48 100644 (file)
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.streams.KafkaStreams
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
@@ -39,11 +40,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me
         return configProperties
     }
 
-    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         throw BluePrintProcessorException("not implemented")
     }
 
-    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         throw BluePrintProcessorException("not implemented")
     }
 
index fdf6e48..77bdbe4 100644 (file)
@@ -51,6 +51,7 @@ import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
+import java.nio.charset.Charset
 import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
@@ -133,9 +134,14 @@ open class BlueprintMessageConsumerServiceTest {
 
             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
             val channel = spyBlueprintMessageConsumerService.subscribe(null)
+            var i = 0
             launch {
                 channel.consumeEach {
-                    assertTrue(it.startsWith("I am message"), "failed to get the actual message")
+                    ++i
+                    val key = it.key()
+                    val value = String(it.value(), Charset.defaultCharset())
+                    assertTrue(value.startsWith("I am message"), "failed to get the actual message")
+                    assertEquals("key_$i", key)
                 }
             }
             delay(10)
@@ -268,6 +274,7 @@ open class BlueprintMessageConsumerServiceTest {
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.toString()
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = "this is my message($it)",
                         headers = headers
                     )
index 537dab1..881f0b4 100644 (file)
@@ -88,7 +88,7 @@ open class BlueprintMessageProducerServiceTest {
 
             every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
 
-            val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
+            val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
             assertTrue(response, "failed to get command response")
         }
     }
index c30ab9b..44990ae 100644 (file)
@@ -132,6 +132,7 @@ class KafkaStreamsConsumerServiceTest {
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.toString()
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = "this is my message($it)",
                         headers = headers
                     )
index 1f3dd65..1ccf230 100644 (file)
@@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
 import org.springframework.boot.context.event.ApplicationReadyEvent
 import org.springframework.context.event.EventListener
 import org.springframework.stereotype.Service
+import java.nio.charset.Charset
+import java.util.UUID
 import java.util.concurrent.Phaser
 import javax.annotation.PreDestroy
 
@@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer(
                     launch {
                         try {
                             ph.register()
-                            log.trace("Consumed Message : $message")
-                            val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+                            val key = message.key() ?: UUID.randomUUID().toString()
+                            val value = String(message.value(), Charset.defaultCharset())
+                            log.trace("Consumed Message : key($key) value($value)")
+                            val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-                            blueprintMessageProducerService.sendMessage(executionServiceOutput)
+                            blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         } finally {
index fca7398..93885bf 100644 (file)
@@ -71,9 +71,10 @@ class KafkaPublishAuditService(
      */
     override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+        val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
         try {
             this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
-            this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+            this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
         } catch (e: Exception) {
             var errMsg =
                     if (e.message != null) "ERROR : ${e.message}"
@@ -89,9 +90,10 @@ class KafkaPublishAuditService(
      */
     override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
         executionServiceOutput.correlationUUID = correlationUUID
+        val key = executionServiceOutput.actionIdentifiers.blueprintName
         try {
             this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
-            this.outputInstance!!.sendMessage(executionServiceOutput)
+            this.outputInstance!!.sendMessage(key, executionServiceOutput)
         } catch (e: Exception) {
             var errMsg =
                 if (e.message != null) "ERROR : $e"