*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-import arrow.effects.IO
-import kotlinx.coroutines.asCoroutineDispatcher
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.core.scheduler.Scheduler
+import reactor.core.scheduler.Schedulers
import java.util.*
import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
private val healthState: HealthState = HealthState.INSTANCE) {
- private val asyncSimulationContext = executor.asCoroutineDispatcher()
private val simulations = ConcurrentHashMap<UUID, Status>()
- fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
val id = UUID.randomUUID()
simulations[id] = StatusOngoing
updateHealthState()
- simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
- result.fold(
- { err ->
- logger.withWarn { log("Error", err) }
- simulations[id] = StatusFailure(err)
- },
- {
- logger.info { "Finished sending messages" }
- simulations[id] = StatusSuccess
- }
- ).also { updateHealthState() }
- }
+ simulationIo
+ .publishOn(scheduler)
+ .doOnSuccess {
+ logger.info { "Finished sending messages" }
+ simulations[id] = StatusSuccess
+ }
+ .doOnError { err ->
+ logger.withWarn { log("Error", err) }
+ simulations[id] = StatusFailure(err)
+ }
+ .doFinally { updateHealthState() }
+ .subscribe()
+
return id
}