Enable Kafka consumer offset committing 38/87438/1
authorFilip Krzywka <filip.krzywka@nokia.com>
Fri, 10 May 2019 09:53:21 +0000 (11:53 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Fri, 10 May 2019 09:53:51 +0000 (11:53 +0200)
It appears that reactor-kafka is setting auto.commit property to false,
which makes our CSITs fail nondeterministically due to automatic reset of
consumer offset.

By acknowledging manually, we will mark every message for committing,
which will be performed according to ConsumerConfiguration.
This way, Kafka broker should persist consumer offset.

Change-Id: I0c5156ff8df9bb3341e733e50a3c6866fdd94976
Issue-ID: DCAEGEN2-1495
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt

index b5b692d..a108eba 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018,2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
 
     fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
             receiver.receive()
+                    .doOnNext { it.receiverOffset().acknowledge() }
                     .also { logger.info { "Started Kafka source" } }
 
     companion object {
@@ -50,10 +51,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
         private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
         private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
 
-        fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
-            return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
-        }
-
+        fun create(bootstrapServers: String, topics: Set<String>) =
+                KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
 
         fun createReceiverOptions(bootstrapServers: String,
                                   topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
@@ -64,6 +63,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
                     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
                     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
                     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+                    ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
+
 
                     CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
                     SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,