package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.ConcurrentLinkedQueue
internal interface ConsumerStateProvider {
fun currentState(): ConsumerState
+ fun reset()
}
-internal class Consumer : KafkaConsumer, ConsumerStateProvider {
+internal class Consumer : ConsumerStateProvider {
private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
override fun reset() = consumedMessages.clear()
- override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
+ fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
- private val consumerProvider = { Consumer() }
-
fun createConsumersFor(topics: Set<String>) =
- ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
- .mapValues { it.value as Consumer }
+ KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource ->
+ val topicToConsumer = topics.associateWith { Consumer() }
+ kafkaSource.start()
+ .map {
+ val topic = it.topic()
+ topicToConsumer.get(topic)?.update(it)
+ ?: logger.warn { "No consumer configured for topic $topic" }
+ }.subscribe()
+ topicToConsumer
+ }
+
+ companion object {
+ private val logger = Logger(DcaeAppConsumerFactory::class)
+ }
}