import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.*
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.clients.consumer.MockConsumer
+import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.apache.kafka.common.TopicPartition
import org.junit.Test
import org.junit.runner.RunWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
-
@RunWith(SpringRunner::class)
@DirtiesContext
-@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
- BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class])
-@TestPropertySource(properties =
-["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
- "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
- "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
- "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
- "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
-
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
- "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
- "blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
-])
+@ContextConfiguration(
+ classes = [BluePrintMessageLibConfiguration::class,
+ BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+ properties =
+ ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+ "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+ "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+
+ "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+ "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ ]
+)
open class BlueprintMessageConsumerServiceTest {
+
val log = logger(BlueprintMessageConsumerServiceTest::class)
@Autowired
fun testKafkaBasicAuthConsumerService() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
- "I am message $i".toByteArray())
+ val record = ConsumerRecord<String, ByteArray>(
+ topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray()
+ )
mockKafkaConsumer.addRecord(record)
}
fun testKafkaBasicAuthConsumerWithDynamicFunction() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
for (i in 1..10) {
- val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
- "I am message $i".toByteArray())
+ val record = ConsumerRecord<String, ByteArray>(
+ topic, 1, i.toLong(), "key_$i",
+ "I am message $i".toByteArray()
+ )
mockKafkaConsumer.addRecord(record)
}
every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
/** Test Consumer Function implementation */
val consumerFunction = object : KafkaConsumerRecordsFunction {
- override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
- consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+ override suspend fun invoke(
+ messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>
+ ) {
val count = consumerRecords.count()
log.trace("Received Message count($count)")
}
}
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
- //@Test
+ // @Test
fun testKafkaIntegration() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val channel = blueprintMessageConsumerService.subscribe(null)
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
delay(100)
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
- blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
- headers = headers)
+ blueprintMessageProducerService.sendMessageNB(
+ message = "this is my message($it)",
+ headers = headers
+ )
}
}
delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
-}
\ No newline at end of file
+}