Get rid of arrow-effects usage
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-xnf-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / xnf / impl / simulations.kt
index fb71b2c..3f43ebe 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
-import arrow.effects.IO
-import kotlinx.coroutines.asCoroutineDispatcher
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Scheduler
+import reactor.core.scheduler.Schedulers
 import java.util.*
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
                          private val healthState: HealthState = HealthState.INSTANCE) {
-    private val asyncSimulationContext = executor.asCoroutineDispatcher()
     private val simulations = ConcurrentHashMap<UUID, Status>()
 
-    fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+    fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
         val id = UUID.randomUUID()
         simulations[id] = StatusOngoing
         updateHealthState()
 
-        simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
-            result.fold(
-                    { err ->
-                        logger.withWarn { log("Error", err) }
-                        simulations[id] = StatusFailure(err)
-                    },
-                    {
-                        logger.info { "Finished sending messages" }
-                        simulations[id] = StatusSuccess
-                    }
-            ).also { updateHealthState() }
-        }
+        simulationIo
+                .publishOn(scheduler)
+                .doOnSuccess {
+                    logger.info { "Finished sending messages" }
+                    simulations[id] = StatusSuccess
+                }
+                .doOnError { err ->
+                    logger.withWarn { log("Error", err) }
+                    simulations[id] = StatusFailure(err)
+                }
+                .doFinally { updateHealthState() }
+                .subscribe()
+
         return id
     }