import arrow.effects.IO
import kotlinx.coroutines.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+ private val healthState: HealthState = HealthState.INSTANCE) {
private val asyncSimulationContext = executor.asCoroutineDispatcher()
private val simulations = ConcurrentHashMap<UUID, Status>()
fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
val id = UUID.randomUUID()
simulations[id] = StatusOngoing
+ updateHealthState()
simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
result.fold(
{ err ->
- logger.warn("Error", err)
+ logger.withWarn { log("Error", err) }
simulations[id] = StatusFailure(err)
},
{
- logger.info("Finished sending messages")
+ logger.info { "Finished sending messages" }
simulations[id] = StatusSuccess
}
- )
+ ).also { updateHealthState() }
}
return id
}
- fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+ private fun updateHealthState() = healthState.changeState(currentState())
+
+ private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
- internal fun clear() {
- simulations.clear()
+ internal fun isAnySimulationPending() = simulations.any {
+ status(it.key) is StatusOngoing
}
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() = simulations.clear()
+
companion object {
private val logger = Logger(XnfApiServer::class)
}