Kafka Back pressure configuration 27/96627/1
authorBrinda Santh <bs2796@att.com>
Fri, 4 Oct 2019 16:23:51 +0000 (12:23 -0400)
committerBrinda Santh <bs2796@att.com>
Fri, 4 Oct 2019 16:23:51 +0000 (12:23 -0400)
Issue-ID: CCSDK-1666
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I173672b14e07a89e0bd2d1ea0e47543b37f389f8

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt

index ab04054..1cd8a2a 100644 (file)
@@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
     var clientId: String? = null
     var topic: String? = null
     var pollMillSec: Long = 1000
+    var pollRecords: Int = -1
 }
 
 open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
index 5a9e61b..b5d444a 100644 (file)
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
         if (messageConsumerProperties.clientId != null) {
             configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
         }
+        /** To handle Back pressure, Get only configured record for processing */
+        if (messageConsumerProperties.pollRecords > 0) {
+            configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+        }
         // TODO("Security Implementation based on type")
         /** add or override already set properties */
         additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
             kafkaConsumer!!.use { kc ->
                 while (keepGoing) {
                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+                    log.info("Consumed Records : ${consumerRecords.count()}")
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             /** execute the command block */
index 2b84eaa..f4e85a9 100644 (file)
@@ -52,6 +52,7 @@ import kotlin.test.assertTrue
     "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
     "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
     "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+    "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
 
     "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
     "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
@@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest {
                     .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
             launch {
                 repeat(5) {
-                    delay(1000)
+                    delay(100)
                     blueprintMessageProducerService.sendMessage("this is my message($it)")
                 }
             }
-            delay(10000)
+            delay(5000)
             blueprintMessageConsumerService.shutDown()
         }
     }