Enable Kafka consumer offset commiting 18/87418/2 dublin
authorFilip Krzywka <filip.krzywka@nokia.com>
Fri, 10 May 2019 06:08:08 +0000 (08:08 +0200)
committerFilip Krzywka <filip.krzywka@nokia.com>
Fri, 10 May 2019 06:35:12 +0000 (08:35 +0200)
It appears that reactor-kafka is setting auto.commit property to false,
wich makes our CSITs fail nondeterministically due to automatic reset of
consumer offset.

By committing manually, Kafka broker should persist consumer offset.

Change-Id: Ie4af7ef516dcde450cc31a499d82c5a90a0ad6c1
Issue-ID: DCAEGEN2-1495
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt

index b5b692d..eb02713 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018,2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
@@ -29,7 +30,6 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
-import reactor.kafka.receiver.ReceiverRecord
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -37,8 +37,9 @@ import reactor.kafka.receiver.ReceiverRecord
  */
 internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
 
-    fun start(): Flux<ReceiverRecord<ByteArray, ByteArray>> =
-            receiver.receive()
+    fun start(): Flux<ConsumerRecord<ByteArray, ByteArray>> =
+            receiver.receiveAutoAck()
+                    .concatMap { it }
                     .also { logger.info { "Started Kafka source" } }
 
     companion object {
@@ -50,10 +51,8 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
         private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
         private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
 
-        fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
-            return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
-        }
-
+        fun create(bootstrapServers: String, topics: Set<String>) =
+                KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
 
         fun createReceiverOptions(bootstrapServers: String,
                                   topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
index 2de89aa..6ecc875 100644 (file)
@@ -19,9 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
 import java.util.concurrent.ConcurrentLinkedQueue
 
 /**
@@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider {
 
     override fun reset() = consumedMessages.clear()
 
-    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+    fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }