Add Message Header support for tracing.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerServiceTest.kt
index f4e85a9..86c2ec5 100644 (file)
@@ -87,14 +87,14 @@ open class BlueprintMessageConsumerServiceTest {
                 partitionsEndMap[partition] = records
                 topicsCollection.add(partition.topic())
             }
-            val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+            val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
             mockKafkaConsumer.subscribe(topicsCollection)
             mockKafkaConsumer.rebalance(partitions)
             mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
             mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
             for (i in 1..10) {
-                val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
-                        "I am message $i")
+                val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+                        "I am message $i".toByteArray())
                 mockKafkaConsumer.addRecord(record)
             }
 
@@ -131,7 +131,10 @@ open class BlueprintMessageConsumerServiceTest {
             launch {
                 repeat(5) {
                     delay(100)
-                    blueprintMessageProducerService.sendMessage("this is my message($it)")
+                    val headers: MutableMap<String, String> = hashMapOf()
+                    headers["id"] = it.toString()
+                    blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+                            headers = headers)
                 }
             }
             delay(5000)