import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)
log.trace("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
- /** execute the command block */
- if (!channel.isClosedForSend) {
- channel.send(consumerRecord)
- } else {
- log.error("Channel is closed to receive message")
+ launch {
+ /** execute the command block */
+ if (!channel.isClosedForSend) {
+ channel.send(consumerRecord)
+ } else {
+ log.error("Channel is closed to receive message")
+ }
}
}
}
checkNotNull(kafkaConsumer) {
"failed to create kafka consumer for " +
- "server(${messageConsumerProperties.bootstrapServers})'s " +
- "topics(${messageConsumerProperties.bootstrapServers})"
+ "server(${messageConsumerProperties.bootstrapServers})'s " +
+ "topics(${messageConsumerProperties.bootstrapServers})"
}
kafkaConsumer!!.subscribe(topics)