Remove IO monad usage from simulators
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-dcae-app-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / impl / MessageStreamValidation.kt
index 47a2d22..144aab0 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
@@ -32,6 +27,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIX
 import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.io.InputStream
 import javax.json.Json
 
@@ -39,20 +35,21 @@ class MessageStreamValidation(
         private val messageGenerator: VesEventGenerator,
         private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
-    fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
-            IO.monadError().bindingCatch {
-                val messageParams = parseMessageParams(jsonDescription)
-                logger.debug { "Parsed message parameters: $messageParams" }
-
-                val expectedEvents = generateEvents(messageParams).bind()
-                val actualEvents = decodeConsumedEvents(consumedMessages)
-
-                if (shouldValidatePayloads(messageParams))
-                    expectedEvents == actualEvents
-                else
-                    validateHeaders(actualEvents, expectedEvents)
-
-            }.fix()
+    fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>) =
+            Mono
+                    .fromSupplier { parseMessageParams(jsonDescription) }
+                    .doOnNext {
+                        logger.debug { "Parsed message parameters: $it" }
+                    }
+                    .flatMap { messageParams ->
+                        val actualEvents = decodeConsumedEvents(consumedMessages)
+                        generateEvents(messageParams).map {
+                            if (shouldValidatePayloads(messageParams))
+                                it == actualEvents
+                            else
+                                validateHeaders(actualEvents, it)
+                        }
+                    }
 
     private fun parseMessageParams(input: InputStream): List<VesEventParameters> {
         val paramsArray = Json.createReader(input).readArray()
@@ -97,11 +94,10 @@ class MessageStreamValidation(
         return generatedHeaders == consumedHeaders
     }
 
-    private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux
+    private fun generateEvents(parameters: List<VesEventParameters>): Mono<List<VesEvent>> = Flux
             .fromIterable(parameters)
             .flatMap { messageGenerator.createMessageFlux(it) }
             .collectList()
-            .asIo()
 
     private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
             consumedMessages.map(VesEvent::parseFrom)