*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.kafka.receiver.ReceiverRecord
import java.util.concurrent.ConcurrentLinkedQueue
/**
internal interface ConsumerStateProvider {
fun currentState(): ConsumerState
- fun reset()
}
-internal class Consumer : ConsumerStateProvider {
+internal class Consumer : KafkaConsumer, ConsumerStateProvider {
private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
override fun reset() = consumedMessages.clear()
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
}
}
-internal class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumersForTopics(kafkaTopics: Set<String>): Map<String, Consumer> =
- KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
- val topicToConsumer = kafkaTopics.associate { it to Consumer() }
- kafkaSource.start()
- .map {
- val topic = it.topic()
- topicToConsumer.get(topic)?.update(it)
- ?: logger.warn { "No consumer configured for topic $topic" }
- }.subscribe()
- topicToConsumer
- }
+internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
- companion object {
- private val logger = Logger(ConsumerFactory::class)
- }
+ private val consumerProvider = { Consumer() }
+
+ fun createConsumersFor(topics: Set<String>) =
+ ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
+ .mapValues { it.value as Consumer }
}