KafkaMessageConsumerService: 'launch' was missing 04/111804/1
authorOleg Mitsura <oleg.mitsura@amdocs.com>
Thu, 27 Aug 2020 14:21:29 +0000 (10:21 -0400)
committerOleg Mitsura <oleg.mitsura@amdocs.com>
Thu, 27 Aug 2020 14:23:31 +0000 (10:23 -0400)
Issue-ID: CCSDK-2704

This was accidentally removed few commits back.

Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
Change-Id: I8f08c72e8d5695c1262aad2d10d1081bbabbdfcb

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt

index a0932e9..83cc0e0 100644 (file)
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -61,8 +62,8 @@ open class KafkaMessageConsumerService(
 
         checkNotNull(kafkaConsumer) {
             "failed to create kafka consumer for " +
-                    "server(${messageConsumerProperties.bootstrapServers})'s " +
-                    "topics(${messageConsumerProperties.bootstrapServers})"
+                "server(${messageConsumerProperties.bootstrapServers})'s " +
+                "topics(${messageConsumerProperties.bootstrapServers})"
         }
 
         kafkaConsumer!!.subscribe(topics)
@@ -76,11 +77,13 @@ open class KafkaMessageConsumerService(
                     log.trace("Consumed Records : ${consumerRecords.count()}")
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
-                            /** execute the command block */
-                            if (!channel.isClosedForSend) {
-                                channel.send(consumerRecord)
-                            } else {
-                                log.error("Channel is closed to receive message")
+                            launch {
+                                /** execute the command block */
+                                if (!channel.isClosedForSend) {
+                                    channel.send(consumerRecord)
+                                } else {
+                                    log.error("Channel is closed to receive message")
+                                }
                             }
                         }
                     }
@@ -111,8 +114,8 @@ open class KafkaMessageConsumerService(
 
         checkNotNull(kafkaConsumer) {
             "failed to create kafka consumer for " +
-                    "server(${messageConsumerProperties.bootstrapServers})'s " +
-                    "topics(${messageConsumerProperties.bootstrapServers})"
+                "server(${messageConsumerProperties.bootstrapServers})'s " +
+                "topics(${messageConsumerProperties.bootstrapServers})"
         }
 
         kafkaConsumer!!.subscribe(topics)