Improve DCAE APP Simulator coverage
[dcaegen2/collectors/hv-ves.git] / hv-collector-dcae-app-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / impl / consumer.kt
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.ReceiverRecord
 import java.util.concurrent.ConcurrentLinkedQueue
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
     val messagesCount: Int by lazy {
         messages.size
     }
@@ -53,19 +54,17 @@ class Consumer : ConsumerStateProvider {
         consumedMessages.clear()
     }
 
-    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+    fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
 
-
     companion object {
         private val logger = Logger(Consumer::class)
     }
 }
 
 class ConsumerFactory(private val kafkaBootstrapServers: String) {
-    fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
-        return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
-    }
+    fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+            KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
 }