Implement message counting in consumer
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-dcae-app-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / impl / consumer.kt
index 2de89aa..8a7aafb 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
 import java.util.concurrent.ConcurrentLinkedQueue
 
 /**
@@ -40,10 +41,9 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr
 
 internal interface ConsumerStateProvider {
     fun currentState(): ConsumerState
-    fun reset()
 }
 
-internal class Consumer : ConsumerStateProvider {
+internal class Consumer : KafkaConsumer, ConsumerStateProvider {
 
     private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
 
@@ -51,7 +51,7 @@ internal class Consumer : ConsumerStateProvider {
 
     override fun reset() = consumedMessages.clear()
 
-    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+    override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
@@ -61,20 +61,11 @@ internal class Consumer : ConsumerStateProvider {
     }
 }
 
-internal class ConsumerFactory(private val kafkaBootstrapServers: String) {
-    fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
-            KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
-                val topicToConsumer = kafkaTopics.associate { it to Consumer() }
-                kafkaSource.start()
-                        .map {
-                            val topic = it.topic()
-                            topicToConsumer.get(topic)?.update(it)
-                                    ?: logger.warn { "No consumer configured for topic $topic" }
-                        }.subscribe()
-                topicToConsumer
-            }
+internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
 
-    companion object {
-        private val logger = Logger(ConsumerFactory::class)
-    }
+    private val consumerProvider = { Consumer() }
+
+    fun createConsumersFor(topics: Set<String>) =
+            ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
+                    .mapValues { it.value as Consumer }
 }