partitionsEndMap[partition] = records
topicsCollection.add(partition.topic())
}
- val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
mockKafkaConsumer.subscribe(topicsCollection)
mockKafkaConsumer.rebalance(partitions)
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
- "I am message $i")
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
mockKafkaConsumer.addRecord(record)
}
launch {
repeat(5) {
delay(100)
- blueprintMessageProducerService.sendMessage("this is my message($it)")
+ val headers: MutableMap<String, String> = hashMapOf()
+ headers["id"] = it.toString()
+ blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+ headers = headers)
}
}
delay(5000)