Reorganize kafka module
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-dcae-app-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / impl / consumer.kt
index 8a7aafb..6ee640a 100644 (file)
@@ -20,8 +20,7 @@
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.util.concurrent.ConcurrentLinkedQueue
 
@@ -41,9 +40,10 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr
 
 internal interface ConsumerStateProvider {
     fun currentState(): ConsumerState
+    fun reset()
 }
 
-internal class Consumer : KafkaConsumer, ConsumerStateProvider {
+internal class Consumer : ConsumerStateProvider {
 
     private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
 
@@ -51,7 +51,7 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
 
     override fun reset() = consumedMessages.clear()
 
-    override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
+    fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
@@ -63,9 +63,19 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
 
 internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
 
-    private val consumerProvider = { Consumer() }
-
     fun createConsumersFor(topics: Set<String>) =
-            ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
-                    .mapValues { it.value as Consumer }
+            KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource ->
+                val topicToConsumer = topics.associateWith { Consumer() }
+                kafkaSource.start()
+                        .map {
+                            val topic = it.topic()
+                            topicToConsumer.get(topic)?.update(it)
+                                    ?: logger.warn { "No consumer configured for topic $topic" }
+                        }.subscribe()
+                topicToConsumer
+            }
+
+    companion object {
+        private val logger = Logger(DcaeAppConsumerFactory::class)
+    }
 }