+ @Test
+ fun testKafkaBasicAuthConsumerWithDynamicFunction() {
+ runBlocking {
+ val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+ val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+ val topic = "default-topic"
+ val partitions: MutableList<TopicPartition> = arrayListOf()
+ val topicsCollection: MutableList<String> = arrayListOf()
+ partitions.add(TopicPartition(topic, 1))
+ val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+ val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+ val records: Long = 10
+ partitions.forEach { partition ->
+ partitionsBeginningMap[partition] = 0L
+ partitionsEndMap[partition] = records
+ topicsCollection.add(partition.topic())
+ }
+ val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
+ mockKafkaConsumer.subscribe(topicsCollection)
+ mockKafkaConsumer.rebalance(partitions)
+ mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+ mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+ for (i in 1..10) {
+ val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray())
+ mockKafkaConsumer.addRecord(record)
+ }
+
+ every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+ /** Test Consumer Function implementation */
+ val consumerFunction = object : KafkaConsumerRecordsFunction {
+ override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+ val count = consumerRecords.count()
+ log.trace("Received Message count($count)")
+ }
+ }
+ spyBlueprintMessageConsumerService.consume(consumerFunction)
+ delay(10)
+ spyBlueprintMessageConsumerService.shutDown()
+ }
+ }
+