Kafka Back pressure configuration
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaBasicAuthMessageConsumerService.kt
index 5a9e61b..b5d444a 100644 (file)
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
         if (messageConsumerProperties.clientId != null) {
             configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
         }
+        /** To handle Back pressure, Get only configured record for processing */
+        if (messageConsumerProperties.pollRecords > 0) {
+            configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+        }
         // TODO("Security Implementation based on type")
         /** add or override already set properties */
         additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
             kafkaConsumer!!.use { kc ->
                 while (keepGoing) {
                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+                    log.info("Consumed Records : ${consumerRecords.count()}")
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             /** execute the command block */