Get rid of arrow-effects usage 11/84211/2
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 3 Apr 2019 13:07:22 +0000 (15:07 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 4 Apr 2019 10:57:15 +0000 (12:57 +0200)
Also clean-up dependencies + use Kotlin BOM to force single
kotlin-stdlib on classpath.

Issue-ID: DCAEGEN2-1392
Change-Id: I447c4686707de81f35f7734255ce0b13c997c4a4
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
34 files changed:
pom.xml
sources/hv-collector-commandline/pom.xml
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/ArgBasedConfiguration.kt
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/WrongArgumentError.kt
sources/hv-collector-commandline/src/main/kotlin/org/onap/dcae/collectors/veshv/commandline/extensions.kt
sources/hv-collector-core/pom.xml
sources/hv-collector-ct/pom.xml
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
sources/hv-collector-dcae-app-simulator/pom.xml
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
sources/hv-collector-domain/pom.xml
sources/hv-collector-health-check/pom.xml
sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
sources/hv-collector-main/pom.xml
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
sources/hv-collector-ssl/pom.xml
sources/hv-collector-utils/pom.xml
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/process/process.kt [moved from sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt with 50% similarity]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
sources/hv-collector-xnf-simulator/pom.xml
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt

diff --git a/pom.xml b/pom.xml
index 8d5a2e6..c0d9f6d 100644 (file)
--- a/pom.xml
+++ b/pom.xml
 
     <properties>
         <kotlin.version>1.3.21</kotlin.version>
-        <arrow.version>0.8.0</arrow.version>
+        <arrow.version>0.9.0</arrow.version>
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
-        <detekt.version>1.0.0-RC11</detekt.version>
+        <detekt.version>1.0.0-RC14</detekt.version>
         <sdk.version>1.1.4-SNAPSHOT</sdk.version>
 
         <!-- Protocol buffers -->
             </dependency>
             <dependency>
                 <groupId>org.jetbrains.kotlin</groupId>
-                <artifactId>kotlin-stdlib-jdk8</artifactId>
-                <version>${kotlin.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.jetbrains.kotlin</groupId>
-                <artifactId>kotlin-reflect</artifactId>
+                <artifactId>kotlin-bom</artifactId>
                 <version>${kotlin.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
             </dependency>
             <dependency>
                 <groupId>org.jetbrains.kotlin</groupId>
                 <artifactId>kotlin-compiler-embeddable</artifactId>
                 <version>${kotlin.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.jetbrains.kotlin</groupId>
-                <artifactId>kotlin-script-runtime</artifactId>
-                <version>${kotlin.version}</version>
-                <scope>runtime</scope>
-            </dependency>
             <dependency>
                 <groupId>org.jetbrains.kotlin</groupId>
                 <artifactId>kotlin-script-util</artifactId>
             <dependency>
                 <groupId>org.jetbrains.kotlinx</groupId>
                 <artifactId>kotlinx-coroutines-core</artifactId>
-                <version>1.0.0</version>
+                <version>1.1.1</version>
             </dependency>
             <dependency>
                 <groupId>com.google.code.gson</groupId>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-core</artifactId>
+                <artifactId>arrow-core-data</artifactId>
                 <version>${arrow.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.jetbrains.kotlin</groupId>
-                        <artifactId>kotlin-stdlib</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>org.jetbrains.kotlin</groupId>
-                        <artifactId>kotlin-stdlib-jdk7</artifactId>
-                    </exclusion>
-                </exclusions>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-syntax</artifactId>
+                <artifactId>arrow-core-extensions</artifactId>
                 <version>${arrow.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-instances-core</artifactId>
+                <artifactId>arrow-extras-data</artifactId>
                 <version>${arrow.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-instances-data</artifactId>
-                <version>${arrow.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-effects</artifactId>
-                <version>${arrow.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-effects-instances</artifactId>
+                <artifactId>arrow-syntax</artifactId>
                 <version>${arrow.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.arrow-kt</groupId>
-                <artifactId>arrow-effects-reactor</artifactId>
+                <artifactId>arrow-typeclasses</artifactId>
                 <version>${arrow.version}</version>
             </dependency>
             <dependency>
             <dependency>
                 <groupId>com.nhaarman.mockitokotlin2</groupId>
                 <artifactId>mockito-kotlin</artifactId>
-                <version>2.0.0</version>
+                <version>2.1.0</version>
                 <scope>test</scope>
             </dependency>
             <dependency>
index 078a3cb..a2ab34f 100644 (file)
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.jetbrains.kotlin</groupId>
-      <artifactId>kotlin-reflect</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.jetbrains.kotlin</groupId>
-      <artifactId>kotlin-test</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.jetbrains.spek</groupId>
       <artifactId>spek-api</artifactId>
index d8c83ea..4656d46 100644 (file)
@@ -36,7 +36,7 @@ abstract class ArgBasedConfiguration<T>(private val parser: CommandLineParser) {
             Try { parseArgumentsArray(args) }
                     .toEither()
                     .mapLeft { WrongArgumentError(it, cmdLineOptionsList) }
-                    .map(this::getConfiguration)
+                    .map(::getConfiguration)
                     .flatMap {
                         it.toEither {
                             WrongArgumentError(
index f3749b3..3e4814f 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.commandline
 
-import arrow.core.Option
 import org.apache.commons.cli.HelpFormatter
 import org.apache.commons.cli.Options
 
@@ -53,17 +52,15 @@ data class WrongArgumentError(
     private fun getOptions() = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
 
     companion object {
-        fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String {
-            val requiredParams = Option.fromNullable(cmdLineOptionsList.filter { it.required }
-                    .takeUnless { it.isEmpty() })
-            return requiredParams.fold(
-                    { "" },
-                    {
-                        it.map { commandLineOption -> commandLineOption.option.opt }
+        fun generateRequiredParametersNote(cmdLineOptionsList: List<CommandLineOption>): String =
+                cmdLineOptionsList.filter { it.required }.let { requiredParams ->
+                    if (requiredParams.isEmpty())
+                        ""
+                    else
+                        requiredParams.map { commandLineOption -> commandLineOption.option.opt }
                                 .joinToString(prefix = "Required parameters: ", separator = ", ")
-                    }
-            )
-        }
+                }
+
     }
 
 }
index 48cac69..6d8ba3f 100644 (file)
@@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.commandline
 
 import arrow.core.Option
 import arrow.core.getOrElse
-import arrow.effects.IO
 import arrow.syntax.function.curried
 import org.apache.commons.cli.CommandLine
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
 
 /**
@@ -34,10 +34,11 @@ import org.onap.dcae.collectors.veshv.utils.arrow.fromNullablesChain
 
 val handleWrongArgumentErrorCurried = ::handleWrongArgumentError.curried()
 
-fun handleWrongArgumentError(programName: String, err: WrongArgumentError): IO<Unit> = IO {
+fun handleWrongArgumentError(programName: String, err: WrongArgumentError): ExitCode {
     err.printMessage()
     err.printHelp(programName)
-}.flatMap { ExitFailure(2).io() }
+    return ExitFailure(2)
+}
 
 fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
         longValue(cmdLineOpt).getOrElse { default }
index e15592f..e1e35d8 100644 (file)
             <version>${project.parent.version}</version>
             <scope>compile</scope>
         </dependency>
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>hv-collector-health-check</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.jetbrains.kotlin</groupId>
-            <artifactId>kotlin-reflect</artifactId>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-core-data</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-core</artifactId>
+            <artifactId>arrow-extras-data</artifactId>
         </dependency>
         <dependency>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.projectreactor.addons</groupId>
-            <artifactId>reactor-extra</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>cbs-client</artifactId>
-        </dependency>
     </dependencies>
 
 </project>
index 86103d0..c846197 100644 (file)
@@ -67,7 +67,7 @@
         </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
-            <artifactId>hv-collector-xnf-simulator</artifactId>
+            <artifactId>hv-collector-ves-message-generator</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
index f1b1ba2..a32d445 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.tests.fakes
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
index 832a578..8bedc88 100644 (file)
         </profile>
     </profiles>
     <dependencies>
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>hv-collector-domain</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-commandline</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects-instances</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects-reactor</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-syntax</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java-util</artifactId>
-        </dependency>
         <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
index 28866f3..93c12d2 100644 (file)
@@ -22,9 +22,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 import arrow.core.getOrElse
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
 import java.io.InputStream
-import java.lang.IllegalArgumentException
 import java.util.concurrent.atomic.AtomicReference
 
 /**
index 5d2977e..f3fd56b 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.http.*
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
 import reactor.netty.http.server.HttpServer
@@ -50,14 +54,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
         )
     }
 
-    fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
-            IO {
+    fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
+            Mono.defer {
                 simulator.listenToTopics(kafkaTopics)
                 HttpServer.create()
                         .host(socketAddress.hostName)
                         .port(socketAddress.port)
                         .route(::setRoutes)
-                        .let { NettyServerHandle(it.bindNow()) }
+                        .bind()
+                        .map { NettyServerHandle(it) }
             }
 
     private fun setRoutes(route: HttpServerRoutes) {
@@ -66,7 +71,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                     req
                             .receive().aggregate().asString()
                             .flatMap {
-                               res.sendOrError{ simulator.listenToTopics(it) }
+                                res.sendOrError { simulator.listenToTopics(it) }
                             }
                 }
                 .delete("/messages") { _, res ->
index 4ad9271..7f4e62b 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
@@ -27,35 +26,29 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValid
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
 private val logger = Logger(PACKAGE_NAME)
 const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
 
-fun main(args: Array<String>) =
+fun main(args: Array<String>): Unit =
         ArgDcaeAppSimConfiguration().parse(args)
-                .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-                .map(::startApp)
-                .unsafeRunEitherSync(
-                        { ex ->
-                            logger.withError { log("Failed to start a server", ex) }
-                            ExitFailure(1)
-                        },
-                        {
-                            logger.info { "Started DCAE-APP Simulator API server" }
-                        }
-                )
+                .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startApp)
+                .let(ExitCode::doExit)
 
-private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    logger.info { "Using configuration: $config" }
+
+private fun startApp(config: DcaeAppSimConfiguration): ExitSuccess {
+    logger.info { "Starting DCAE-APP Simulator API server with configuration: $config" }
     val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
     val generatorFactory = MessageGeneratorFactory(config.maxPayloadSizeBytes)
     val messageStreamValidation = MessageStreamValidation(generatorFactory.createVesEventGenerator())
-    return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
+    DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
             .start(config.apiAddress, config.kafkaTopics)
             .flatMap { it.await() }
+            .block()
+    return ExitSuccess
 }
index 63fee2d..40e7c93 100644 (file)
@@ -71,7 +71,7 @@
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-core</artifactId>
+            <artifactId>arrow-core-data</artifactId>
         </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
index 6891593..90ec958 100644 (file)
             <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects</artifactId>
-        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
         </dependency>
-
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-test</artifactId>
index fb5bb9a..cff8160 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.healthcheck.factory
 
-import arrow.effects.IO
 import io.netty.handler.codec.http.HttpResponseStatus
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -45,9 +44,9 @@ class HealthCheckApiServer(private val healthState: HealthState,
 
     private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
 
-    fun start(): IO<ServerHandle> = IO {
+    fun start(): Mono<ServerHandle> = Mono.defer {
         healthState().subscribe(healthDescription::set)
-        val ctx = HttpServer.create()
+        HttpServer.create()
                 .tcpConfiguration {
                     it.addressSupplier { listenAddress }
                             .doOnUnbound { logClose() }
@@ -57,7 +56,9 @@ class HealthCheckApiServer(private val healthState: HealthState,
                     routes.get("/health/alive", ::livenessHandler)
                     routes.get("/monitoring/prometheus", ::monitoringHandler)
                 }
-        NettyServerHandle(ctx.bindNow())
+                .bind()
+                .map { NettyServerHandle(it) }
+
     }
 
     private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
index 57f21a6..3fe8932 100644 (file)
@@ -93,7 +93,7 @@
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-core</artifactId>
+            <artifactId>arrow-core-data</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
index 8b0a38b..dfb388d 100644 (file)
@@ -50,7 +50,7 @@ fun main(args: Array<String>) {
         }
     }
 
-    HealthCheckServer.start(configurationModule.healthCheckPort(args))
+    HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
     configurationModule
             .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
             .publishOn(Schedulers.single(Schedulers.elastic()))
index 9b58dcc..c970e5c 100644 (file)
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.net.InetSocketAddress
 
@@ -39,8 +38,7 @@ object HealthCheckServer {
     fun start(port: Int) =
             createHealthCheckServer(port)
                     .start()
-                    .then(::logServerStarted)
-                    .unsafeRunSync()
+                    .doOnSuccess(::logServerStarted)
 
     private fun createHealthCheckServer(listenPort: Int) =
             HealthCheckApiServer(
index a4bc7c7..0ba609e 100644 (file)
 
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-syntax</artifactId>
+            <artifactId>arrow-core-data</artifactId>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
index e85b8ee..5053cf0 100644 (file)
             <artifactId>kotlin-reflect</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-instances-data</artifactId>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects</artifactId>
+            <artifactId>arrow-typeclasses</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects-instances</artifactId>
+            <artifactId>arrow-core-extensions</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-syntax</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.jetbrains.kotlinx</groupId>
-            <artifactId>kotlinx-coroutines-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
index 47b3d55..cfed7f3 100644 (file)
@@ -23,16 +23,11 @@ import arrow.core.Either
 import arrow.core.ForOption
 import arrow.core.Option
 import arrow.core.Try
+import arrow.core.extensions.option.monad.monad
 import arrow.core.fix
 import arrow.core.identity
-import arrow.effects.ForIO
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.instances.option.monad.monad
 import arrow.syntax.collections.firstOption
 import arrow.typeclasses.MonadContinuation
-import arrow.typeclasses.binding
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import java.util.concurrent.atomic.AtomicReference
@@ -47,11 +42,6 @@ object OptionUtils {
             : Option<A> = Option.monad().binding(c).fix()
 }
 
-object IOUtils {
-    fun <A> binding(c: suspend MonadContinuation<ForIO, *>.() -> A)
-            : IO<A> = IO.monad().binding(c).fix()
-}
-
 fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
 
 fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
index 9023528..ac39100 100644 (file)
@@ -28,7 +28,8 @@ sealed class Marker(internal val slf4jMarker: org.slf4j.Marker, val mdc: Map<Str
     object Entry : Marker(ENTRY)
     object Exit : Marker(EXIT)
 
-    class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) : Marker(INVOKE, mdc(id, timestamp)) {
+    class Invoke(id: UUID = UUID.randomUUID(), timestamp: Instant = Instant.now()) :
+            Marker(INVOKE, mdc(id, timestamp)) {
         companion object {
             private fun mdc(id: UUID, timestamp: Instant) = mapOf(
                     OnapMdc.INVOCATION_ID to id.toString(),
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.utils.arrow
+package org.onap.dcae.collectors.veshv.utils.process
 
-import arrow.core.Either
-import arrow.core.Left
-import arrow.core.Right
-import arrow.effects.IO
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
 import kotlin.system.exitProcess
 
 /**
@@ -37,9 +29,8 @@ import kotlin.system.exitProcess
 sealed class ExitCode {
     abstract val code: Int
 
-    fun io() = IO {
-        exitProcess(code)
-    }
+    fun doExit(): Nothing = exitProcess(code)
+
 }
 
 object ExitSuccess : ExitCode() {
@@ -47,33 +38,3 @@ object ExitSuccess : ExitCode() {
 }
 
 data class ExitFailure(override val code: Int) : ExitCode()
-
-inline fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
-        flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
-
-fun IO<Any>.unit() = map { Unit }
-
-fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
-    subscribe({
-        callback(Right(it))
-    }, {
-        callback(Left(it))
-    })
-}
-
-fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
-        toMono().then(Mono.fromCallable(callback))
-
-fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
-        flatMap { io ->
-            io.attempt().unsafeRunSync().fold(
-                    { Flux.error<T>(it) },
-                    { Flux.just<T>(it) }
-            )
-        }
-
-inline fun <T> IO<T>.then(crossinline block: (T) -> Unit): IO<T> =
-        map {
-            block(it)
-            it
-        }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
new file mode 100644 (file)
index 0000000..ceccbcb
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+package org.onap.dcae.collectors.veshv.utils.rx
+
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
+
+fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
+        toMono().then(Mono.fromCallable(callback))
index 670ab4a..728d62b 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.utils
 
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
 import reactor.netty.DisposableServer
@@ -29,7 +28,7 @@ import reactor.netty.DisposableServer
  * @since August 2018
  */
 abstract class ServerHandle(val host: String, val port: Int) : Closeable {
-    abstract fun await(): IO<Unit>
+    abstract fun await(): Mono<Void>
 }
 
 /**
@@ -58,8 +57,10 @@ class NettyServerHandle(private val ctx: DisposableServer,
                 }
             }
 
-    override fun await() = IO<Unit> {
-        ctx.channel().closeFuture().sync()
+    override fun await(): Mono<Void> = Mono.create { callback ->
+        ctx.channel().closeFuture().addListener {
+            callback.success()
+        }
     }
 
     companion object {
index c17d29f..a9ac0bc 100644 (file)
     </profiles>
 
     <dependencies>
-        <dependency>
-            <groupId>${project.parent.groupId}</groupId>
-            <artifactId>hv-collector-domain</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-ssl</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.arrow-kt</groupId>
-            <artifactId>arrow-effects-instances</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.kotlinx</groupId>
-            <artifactId>kotlinx-coroutines-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-stdlib-jdk8</artifactId>
             <artifactId>logback-classic</artifactId>
             <scope>runtime</scope>
         </dependency>
-        <!-- See comment in main pom
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-transport-native-epoll</artifactId>
-            <classifier>${os.detected.classifier}</classifier>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-tcnative-boringssl-static</artifactId>
-            <classifier>${os.detected.classifier}</classifier>
-        </dependency>
-        -->
         <dependency>
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
index 93c4317..49d6a47 100644 (file)
@@ -22,13 +22,10 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 import arrow.core.Either
 import arrow.core.Some
 import arrow.core.Try
+import arrow.core.extensions.either.monad.monad
 import arrow.core.fix
-import arrow.effects.IO
-import arrow.instances.either.monad.monad
-import arrow.typeclasses.binding
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
@@ -60,7 +57,7 @@ class XnfSimulator(
 
     private val defaultHvVesClient by lazy { clientFactory.create() }
 
-    fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+    fun startSimulation(messageParameters: InputStream): Either<ParsingError, Mono<Void>> =
             Either.monad<ParsingError>().binding {
                 val json = parseJsonArray(messageParameters).bind()
                 val parameters = messageParametersParser.parse(json).bind()
@@ -73,7 +70,7 @@ class XnfSimulator(
                     .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
 
 
-    private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+    private fun simulationFrom(parameters: List<MessageParameters>): Mono<Void> =
             parameters
                     .map(::asClientToMessages)
                     .groupMessagesByClients()
@@ -81,8 +78,7 @@ class XnfSimulator(
                     .toList()
                     .toFlux()
                     .map(::simulate)
-                    .then(Mono.just(Unit))
-                    .asIo()
+                    .then()
 
     private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
             groupBy({ it.first }, { it.second })
index 1957943..e50f1e7 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
 
-import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.rx.then
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
index fb2c532..e68dd44 100644 (file)
@@ -20,7 +20,6 @@
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
 
 import arrow.core.Either
-import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -48,13 +47,14 @@ internal class XnfApiServer(
         private val xnfSimulator: XnfSimulator,
         private val ongoingSimulations: OngoingSimulations) {
 
-    fun start(socketAddress: InetSocketAddress): IO<ServerHandle> = IO {
-        HttpServer.create()
-                .host(socketAddress.hostName)
-                .port(socketAddress.port)
-                .route(::setRoutes)
-                .let { NettyServerHandle(it.bindNow()) }
-    }
+    fun start(socketAddress: InetSocketAddress): Mono<ServerHandle> =
+            HttpServer.create()
+                    .host(socketAddress.hostName)
+                    .port(socketAddress.port)
+                    .route(::setRoutes)
+                    .bind()
+                    .map { NettyServerHandle(it) }
+
 
     private fun setRoutes(route: HttpServerRoutes) {
         route
index 5e1c979..a2501eb 100644 (file)
@@ -32,7 +32,7 @@ import reactor.core.publisher.Mono
 internal class XnfHealthCheckServer {
     fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config)
             .start()
-            .map { logger.info(serverStartedMessage(it)); it }
+            .doOnNext { logger.info(serverStartedMessage(it)) }
 
     private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer {
         val monitoring = object : PrometheusMetricsProvider {
index fb71b2c..3f43ebe 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
-import arrow.effects.IO
-import kotlinx.coroutines.asCoroutineDispatcher
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Scheduler
+import reactor.core.scheduler.Schedulers
 import java.util.*
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
                          private val healthState: HealthState = HealthState.INSTANCE) {
-    private val asyncSimulationContext = executor.asCoroutineDispatcher()
     private val simulations = ConcurrentHashMap<UUID, Status>()
 
-    fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+    fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
         val id = UUID.randomUUID()
         simulations[id] = StatusOngoing
         updateHealthState()
 
-        simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
-            result.fold(
-                    { err ->
-                        logger.withWarn { log("Error", err) }
-                        simulations[id] = StatusFailure(err)
-                    },
-                    {
-                        logger.info { "Finished sending messages" }
-                        simulations[id] = StatusSuccess
-                    }
-            ).also { updateHealthState() }
-        }
+        simulationIo
+                .publishOn(scheduler)
+                .doOnSuccess {
+                    logger.info { "Finished sending messages" }
+                    simulations[id] = StatusSuccess
+                }
+                .doOnError { err ->
+                    logger.withWarn { log("Error", err) }
+                    simulations[id] = StatusFailure(err)
+                }
+                .doFinally { updateHealthState() }
+                .subscribe()
+
         return id
     }
 
index a1042f3..4fcb180 100644 (file)
@@ -19,8 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
-import arrow.effects.IO
 import io.vavr.collection.HashSet
+import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -32,9 +32,8 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulator
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.IOUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.process.ExitCode
+import org.onap.dcae.collectors.veshv.utils.process.ExitSuccess
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 
@@ -46,36 +45,29 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
-        .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-        .map(::startServers)
-        .unsafeRunEitherSync(
-                { ex ->
-                    logger.withError { log("Failed to start a server", ex) }
-                    ExitFailure(1)
-                },
-                {
-                    logger.info { "Stopping xNF Simulator API server" }
-                }
-        )
+fun main(args: Array<String>): Unit =
+        ArgXnfSimulatorConfiguration().parse(args)
+                .fold(handleWrongArgumentErrorCurried(PROGRAM_NAME), ::startServers)
+                .let(ExitCode::doExit)
 
-private fun startServers(config: SimulatorConfiguration): IO<Unit> =
-        binding {
-            logger.info { "Using configuration: $config" }
+private fun startServers(config: SimulatorConfiguration): ExitCode {
+    logger.info { "Using configuration: $config" }
 
-            XnfHealthCheckServer().startServer(config).bind()
+    XnfHealthCheckServer().startServer(config).block()
 
-            val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
-            val xnfSimulator = XnfSimulator(
-                    ClientFactory(clientConfig),
-                    MessageGeneratorFactory(config.maxPayloadSizeBytes)
-            )
-            val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
-                    .start(config.listenAddress).bind()
+    val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
+    val xnfSimulator = XnfSimulator(
+            ClientFactory(clientConfig),
+            MessageGeneratorFactory(config.maxPayloadSizeBytes)
+    )
+    val xnfApiServerHandler = XnfApiServer(xnfSimulator, OngoingSimulations())
+            .start(config.listenAddress)
+            .block()
 
-            logger.info { "Started xNF Simulator API server" }
-            HealthState.INSTANCE.changeState(HealthDescription.IDLE)
+    logger.info { "Started xNF Simulator API server" }
+    HealthState.INSTANCE.changeState(HealthDescription.IDLE)
 
-            xnfApiServerHandler.await().bind()
-        }
+    xnfApiServerHandler.await().block()
+    return ExitSuccess
+}
 
index 113c3c4..325d3bb 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
-import arrow.effects.IO
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -32,16 +31,17 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
 import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
 import java.util.*
-import java.util.concurrent.Executors
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since September 2018
  */
 internal class OngoingSimulationsTest : Spek({
-    val executor = Executors.newSingleThreadExecutor()
-    val cut = OngoingSimulations(executor)
+    val scheduler = Schedulers.single()
+    val cut = OngoingSimulations(scheduler)
 
     describe("simulations repository") {
         given("not existing task task id") {
@@ -121,19 +121,22 @@ internal class OngoingSimulationsTest : Spek({
         }
 
         afterGroup {
-            executor.shutdown()
+            scheduler.dispose()
         }
     }
 
     afterEachTest { cut.clear() }
 })
 
-private fun neverendingTask() = IO.async<Unit> { }
+private fun neverendingTask() = Mono.never<Void>()
 
-private fun succesfulTask(): IO<Unit> = IO { println("great success!") }
+private fun succesfulTask(): Mono<Void> = Mono.empty<Void>()
+        .doOnSuccess {
+            println("great success")
+        }
 
-private fun failingTask(): Pair<RuntimeException, IO<Unit>> {
+private fun failingTask(): Pair<RuntimeException, Mono<Void>> {
     val cause = RuntimeException("facepalm")
-    val task = IO.raiseError<Unit>(cause)
+    val task = Mono.error<Void>(cause)
     return Pair(cause, task)
 }
index 29281cd..ea0628c 100644 (file)
@@ -126,7 +126,7 @@ internal class XnfSimulatorTest : Spek({
             whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
 
             // when
-            cut.startSimulation(json).map { it.unsafeRunSync() }
+            cut.startSimulation(json).map { it.block() }
 
             // then
             verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))