Remove IO monad usage from simulators 40/83440/4
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Wed, 20 Mar 2019 13:59:24 +0000 (14:59 +0100)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Thu, 28 Mar 2019 10:53:11 +0000 (11:53 +0100)
Change-Id: I1c470777b91230f4a44a4960ca534e4b20c1ac43
Issue-ID: DCAEGEN2-1372
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/netty.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt

index f7d94de..28866f3 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import arrow.core.getOrElse
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
 import java.io.InputStream
+import java.lang.IllegalArgumentException
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -39,7 +37,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
 
     fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
 
-    fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
+    fun listenToTopics(topics: Set<String>) {
         if (topics.isEmpty() || topics.any { it.isBlank() }) {
             val message = "Topic list cannot be empty or contain empty elements, topics: $topics"
             logger.info { message }
@@ -47,17 +45,15 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
         }
 
         logger.info { "Received new configuration. Creating consumer for topics: $topics" }
-        consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
-    }.fix()
+        consumerState.set(consumerFactory.createConsumerForTopics(topics))
+    }
 
     fun state() = consumerState.getOption().map { it.currentState() }
 
-    fun resetState(): IO<Unit> = consumerState.getOption().fold(
-            { IO.unit },
-            { it.reset() }
-    )
+    fun resetState() = consumerState.getOption().fold({ }, { it.reset() })
+
 
-    fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
+    fun validate(jsonDescription: InputStream)= messageStreamValidation.validate(jsonDescription, currentMessages())
 
     private fun currentMessages(): List<ByteArray> =
             consumerState.getOption()
index 47a2d22..144aab0 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.bindingCatch
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
@@ -32,6 +27,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIX
 import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.io.InputStream
 import javax.json.Json
 
@@ -39,20 +35,21 @@ class MessageStreamValidation(
         private val messageGenerator: VesEventGenerator,
         private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
-    fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
-            IO.monadError().bindingCatch {
-                val messageParams = parseMessageParams(jsonDescription)
-                logger.debug { "Parsed message parameters: $messageParams" }
-
-                val expectedEvents = generateEvents(messageParams).bind()
-                val actualEvents = decodeConsumedEvents(consumedMessages)
-
-                if (shouldValidatePayloads(messageParams))
-                    expectedEvents == actualEvents
-                else
-                    validateHeaders(actualEvents, expectedEvents)
-
-            }.fix()
+    fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>) =
+            Mono
+                    .fromSupplier { parseMessageParams(jsonDescription) }
+                    .doOnNext {
+                        logger.debug { "Parsed message parameters: $it" }
+                    }
+                    .flatMap { messageParams ->
+                        val actualEvents = decodeConsumedEvents(consumedMessages)
+                        generateEvents(messageParams).map {
+                            if (shouldValidatePayloads(messageParams))
+                                it == actualEvents
+                            else
+                                validateHeaders(actualEvents, it)
+                        }
+                    }
 
     private fun parseMessageParams(input: InputStream): List<VesEventParameters> {
         val paramsArray = Json.createReader(input).readArray()
@@ -97,11 +94,10 @@ class MessageStreamValidation(
         return generatedHeaders == consumedHeaders
     }
 
-    private fun generateEvents(parameters: List<VesEventParameters>): IO<List<VesEvent>> = Flux
+    private fun generateEvents(parameters: List<VesEventParameters>): Mono<List<VesEvent>> = Flux
             .fromIterable(parameters)
             .flatMap { messageGenerator.createMessageFlux(it) }
             .collectList()
-            .asIo()
 
     private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
             consumedMessages.map(VesEvent::parseFrom)
index d2c5b27..5d2977e 100644 (file)
@@ -50,9 +50,9 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
         )
     }
 
-
     fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
-            simulator.listenToTopics(kafkaTopics).map {
+            IO {
+                simulator.listenToTopics(kafkaTopics)
                 HttpServer.create()
                         .host(socketAddress.hostName)
                         .port(socketAddress.port)
@@ -60,22 +60,21 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                         .let { NettyServerHandle(it.bindNow()) }
             }
 
-
     private fun setRoutes(route: HttpServerRoutes) {
         route
                 .put("/configuration/topics") { req, res ->
                     req
                             .receive().aggregate().asString()
                             .flatMap {
-                                val option = simulator.listenToTopics(it)
-                                res.sendOrError(option).then()
+                               res.sendOrError{ simulator.listenToTopics(it) }
                             }
                 }
                 .delete("/messages") { _, res ->
                     logger.info { "Resetting simulator state" }
+
                     res
                             .header("Content-type", CONTENT_TEXT)
-                            .sendOrError(simulator.resetState())
+                            .sendOrError { simulator.resetState() }
                 }
                 .get("/messages/all/count") { _, res ->
                     logger.info { "Processing request for count of received messages" }
@@ -93,12 +92,13 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 .post("/messages/all/validate") { req, res ->
                     req
                             .receive().aggregate().asInputStream()
-                            .flatMap { body ->
+                            .map {
                                 logger.info { "Processing request for message validation" }
-                                val response =
-                                        simulator.validate(body)
-                                                .map(::resolveValidationResponse)
-                                res.sendAndHandleErrors(response).then()
+                                simulator.validate(it)
+                                        .map(::resolveValidationResponse)
+                            }
+                            .flatMap {
+                                res.sendAndHandleErrors(it)
                             }
                 }
                 .get("/healthcheck") { _, res ->
index 10dedbd..7bab967 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
-import arrow.effects.IO
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
-import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
@@ -34,11 +32,10 @@ import reactor.kafka.receiver.ReceiverOptions
  */
 class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
 
-    fun start(): IO<Consumer> = IO {
-        val consumer = Consumer()
-        receiver.receive().map(consumer::update).evaluateIo().subscribe()
-        consumer
-    }
+    fun start() = Consumer()
+            .also { consumer ->
+                receiver.receive().map(consumer::update)
+            }
 
     companion object {
         private val logger = Logger(KafkaSource::class)
index 1eefdbd..725248c 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.ReceiverRecord
@@ -41,7 +40,7 @@ class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
 
 interface ConsumerStateProvider {
     fun currentState(): ConsumerState
-    fun reset(): IO<Unit>
+    fun reset()
 }
 
 class Consumer : ConsumerStateProvider {
@@ -50,11 +49,9 @@ class Consumer : ConsumerStateProvider {
 
     override fun currentState(): ConsumerState = ConsumerState(consumedMessages)
 
-    override fun reset(): IO<Unit> = IO {
-        consumedMessages.clear()
-    }
+    override fun reset() = consumedMessages.clear()
 
-    fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
+    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
@@ -65,6 +62,6 @@ class Consumer : ConsumerStateProvider {
 }
 
 class ConsumerFactory(private val kafkaBootstrapServers: String) {
-    fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+    fun createConsumerForTopics(kafkaTopics: Set<String>): Consumer =
             KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
 }
index 08558d7..e8ac6cd 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ internal class ConsumerTest : Spek({
                         topic = "topic",
                         key = byteArrayOf(1),
                         value = value
-                )).unsafeRunSync()
+                ))
             }
 
             it("should contain one message if it was updated once") {
@@ -60,7 +60,7 @@ internal class ConsumerTest : Spek({
             }
 
             it("should contain empty state message if it was reset after update") {
-                cut.reset().unsafeRunSync()
+                cut.reset()
                 assertEmptyState(cut)
             }
         }
index e1641cb..493100f 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import arrow.core.Left
 import arrow.core.None
 import arrow.core.Some
-import arrow.effects.IO
 import com.google.protobuf.ByteString
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.eq
@@ -30,14 +28,18 @@ import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.never
 import com.nhaarman.mockitokotlin2.verify
 import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.*
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.mockito.ArgumentMatchers.anySet
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.lang.IllegalArgumentException
 import java.util.concurrent.ConcurrentLinkedQueue
+import kotlin.test.assertFailsWith
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -55,7 +57,7 @@ internal class DcaeAppSimulatorTest : Spek({
         consumer = mock()
         cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
 
-        whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
+        whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
     }
 
     fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
@@ -64,48 +66,36 @@ internal class DcaeAppSimulatorTest : Spek({
         val topics = setOf("perf3gpp", "faults")
 
         it("should fail when topic list is empty") {
-            val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
-            assertThat(result.isLeft()).isTrue()
+            assertFailsWith(IllegalArgumentException::class){
+                cut.listenToTopics(setOf())
+            }
         }
 
         it("should fail when topic list contains empty strings") {
-            val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
-            assertThat(result.isLeft()).isTrue()
+            assertFailsWith(IllegalArgumentException::class){
+                cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
+            }
         }
 
         it("should subscribe to given topics") {
-            cut.listenToTopics(topics).unsafeRunSync()
+            cut.listenToTopics(topics)
             verify(consumerFactory).createConsumerForTopics(topics)
         }
 
         it("should subscribe to given topics when called with comma separated list") {
-            cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
+            cut.listenToTopics("perf3gpp,faults")
             verify(consumerFactory).createConsumerForTopics(topics)
         }
-
-        it("should handle errors") {
-            // given
-            val error = RuntimeException("WTF")
-            whenever(consumerFactory.createConsumerForTopics(anySet()))
-                    .thenReturn(IO.raiseError(error))
-
-            // when
-            val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
-
-            // then
-            assertThat(result).isEqualTo(Left(error))
-        }
     }
 
     describe("state") {
-
         it("should return None when topics hasn't been initialized") {
             assertThat(cut.state()).isEqualTo(None)
         }
 
         describe("when topics are initialized") {
             beforeEachTest {
-                cut.listenToTopics("perf3gpp").unsafeRunSync()
+                cut.listenToTopics("perf3gpp")
             }
 
             it("should return some state when it has been set") {
@@ -119,21 +109,18 @@ internal class DcaeAppSimulatorTest : Spek({
 
     describe("resetState") {
         it("should do nothing when topics hasn't been initialized") {
-            cut.resetState().unsafeRunSync()
+            cut.resetState()
             verify(consumer, never()).reset()
         }
 
         describe("when topics are initialized") {
             beforeEachTest {
-                cut.listenToTopics("perf3gpp").unsafeRunSync()
+                cut.listenToTopics("perf3gpp")
             }
 
             it("should reset the state") {
-                // given
-                whenever(consumer.reset()).thenReturn(IO.unit)
-
                 // when
-                cut.resetState().unsafeRunSync()
+                cut.resetState()
 
                 // then
                 verify(consumer).reset()
@@ -143,29 +130,30 @@ internal class DcaeAppSimulatorTest : Spek({
 
     describe("validate") {
         beforeEachTest {
-            whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
+            whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
         }
 
         it("should use empty list when consumer is unavailable") {
-            // when
-            val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+            StepVerifier
+                    .create(cut.validate("['The JSON']".byteInputStream()))
+                    .expectNext(true)
+                    .verifyComplete()
 
-            // then
             verify(messageStreamValidation).validate(any(), eq(emptyList()))
-            assertThat(result).isTrue()
         }
 
         it("should delegate to MessageStreamValidation") {
             // given
-            cut.listenToTopics("perf3gpp").unsafeRunSync()
+            cut.listenToTopics("perf3gpp")
             whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
 
-            // when
-            val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+           StepVerifier
+                    .create(cut.validate("['The JSON']".byteInputStream()))
+                   .expectNext(true)
+                    .verifyComplete()
 
             // then
             verify(messageStreamValidation).validate(any(), any())
-            assertThat(result).isTrue()
         }
     }
 })
index bff7709..8886799 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,11 +24,9 @@ import com.google.protobuf.ByteString
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.utils.assertFailedWithError
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
@@ -38,6 +36,8 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventG
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
+import java.lang.IllegalArgumentException
 import javax.json.stream.JsonParsingException
 
 /**
@@ -60,24 +60,22 @@ internal class MessageStreamValidationTest : Spek({
     }
 
     describe("validate") {
-
         it("should return error when JSON is invalid") {
-            // when
-            val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
-
-            // then
-            result.assertFailedWithError { it is JsonParsingException }
+            StepVerifier
+                    .create(cut.validate("[{invalid json}]".byteInputStream(), listOf()))
+                    .expectError(JsonParsingException::class.java)
+                    .verify()
         }
 
         it("should return error when message param list is empty") {
-            // given
-            givenParsedMessageParameters()
-
-            // when
-            val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
+                // given
+                givenParsedMessageParameters()
 
-            // then
-            result.assertFailedWithError { it is IllegalArgumentException }
+                //when
+                StepVerifier
+                        .create(cut.validate(sampleJsonAsStream(), listOf()))
+                        .expectError(IllegalArgumentException::class.java)
+                        .verify()
         }
 
         describe("when validating headers only") {
@@ -90,11 +88,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, VALID, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isTrue()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(true)
+                        .verifyComplete()
             }
 
             it("should return true when messages differ with payload only") {
@@ -108,11 +105,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isTrue()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(true)
+                        .verifyComplete()
             }
 
             it("should return false when messages are different") {
@@ -125,11 +121,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, VALID, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isFalse()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(false)
+                        .verifyComplete()
             }
         }
 
@@ -143,11 +138,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(event.commonEventHeader, FIXED_PAYLOAD, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(event))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isTrue()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(true)
+                        .verifyComplete()
             }
 
             it("should return false when messages differ with payload only") {
@@ -160,11 +154,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isFalse()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(false)
+                        .verifyComplete()
             }
 
             it("should return false when messages are different") {
@@ -177,11 +170,10 @@ internal class MessageStreamValidationTest : Spek({
                 givenParsedMessageParameters(VesEventParameters(generatedEvent.commonEventHeader, FIXED_PAYLOAD, 1))
                 whenever(messageGenerator.createMessageFlux(any())).thenReturn(Flux.just(generatedEvent))
 
-                // when
-                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
-
-                // then
-                assertThat(result).isFalse()
+                StepVerifier
+                        .create(cut.validate(jsonAsStream, listOf(receivedMessageBytes)))
+                        .expectNext(false)
+                        .verifyComplete()
             }
         }
     }
index cf338a7..f133d63 100644 (file)
 package org.onap.dcae.collectors.veshv.utils.http
 
 import arrow.core.Either
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
 import reactor.netty.NettyOutbound
 import reactor.netty.http.server.HttpServerResponse
 import javax.json.Json
 
 private val logger = Logger("org.onap.dcae.collectors.veshv.utils.http.netty")
 
-fun HttpServerResponse.sendOrError(action: IO<Unit>): NettyOutbound =
-        sendAndHandleErrors(action.map {
-            Response(
-                    HttpStatus.OK,
-                    Content(
-                            ContentType.JSON,
-                            Json.createObjectBuilder().add("response", "Request accepted").build()
+fun HttpServerResponse.sendOrError(action: ()->Unit) = sendAndHandleErrors(
+        Mono
+                .fromSupplier(action)
+                .map {
+                    Response(
+                            HttpStatus.OK,
+                            Content(
+                                    ContentType.JSON,
+                                    Json.createObjectBuilder().add("response", "Request accepted").build()
+                            )
                     )
-            )
-        })
-
+                }
+)
 
-fun HttpServerResponse.sendAndHandleErrors(response: IO<Response>): NettyOutbound =
-        response.attempt().unsafeRunSync().fold(
-                { err ->
-                    logger.withWarn { log("Error occurred. Sending .", err) }
-                    val message = err.message
-                    sendResponse(errorResponse(message))
-                },
-                {
-                    sendResponse(it)
+fun HttpServerResponse.sendAndHandleErrors(response: Mono<Response>) =
+        response
+                .onErrorResume {
+                    logger.withWarn { log("Error occurred. Sending .", it) }
+                    errorResponse(it.localizedMessage).toMono()
+                }
+                .flatMap {
+                    sendResponse(it).then()
                 }
-        )
 
-fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>): NettyOutbound =
+fun <A> HttpServerResponse.sendEitherErrorOrResponse(response: Either<A, Response>) =
         when (response) {
-            is Either.Left -> sendResponse(errorResponse(response.a.toString()))
-            is Either.Right -> sendAndHandleErrors(IO.just(response.b))
+            is Either.Left -> sendResponse(errorResponse(response.a.toString())).then()
+            is Either.Right -> sendAndHandleErrors(Mono.just(response.b))
         }
 
-private fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
+
+fun HttpServerResponse.sendResponse(response: Response): NettyOutbound {
     val respWithStatus = status(response.status.number)
     val responseContent = response.content
 
-    return respWithStatus.sendString(Mono.just(responseContent.serializer.run { responseContent.value.show() }))
+    return respWithStatus.sendString(
+            Mono.just(responseContent.serializer.run { responseContent.value.show() })
+    )
 }
 
 private fun errorResponse(message: String?): Response =
index 7df7283..fb2c532 100644 (file)
@@ -72,7 +72,7 @@ internal class XnfApiServer(
                         is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
                         is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
                     }
-                    res.sendEitherErrorOrResponse(id).then()
+                    res.sendEitherErrorOrResponse(id)
                 }
     }
 
@@ -90,7 +90,7 @@ internal class XnfApiServer(
         val status = ongoingSimulations.status(id)
         val response = Responses.statusResponse(status.toString(), status.message)
         logger.info { "Task $id status: $response" }
-        return res.sendAndHandleErrors(IO.just(response)).then()
+        return res.sendAndHandleErrors(Mono.just(response))
     }
 
     companion object {