* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
import java.util.concurrent.ConcurrentLinkedQueue
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
val messagesCount: Int by lazy {
messages.size
}
consumedMessages.clear()
}
- fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
consumedMessages.add(record.value())
}
-
companion object {
private val logger = Logger(Consumer::class)
}
}
class ConsumerFactory(private val kafkaBootstrapServers: String) {
- fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
- return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
- }
+ fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+ KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
}