2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
22 import arrow.effects.IO
23 import kotlinx.coroutines.asCoroutineDispatcher
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
25 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
27 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
28 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import java.util.concurrent.ConcurrentHashMap
31 import java.util.concurrent.Executor
32 import java.util.concurrent.Executors
35 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
38 class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
39 private val healthState: HealthState = HealthState.INSTANCE) {
40 private val asyncSimulationContext = executor.asCoroutineDispatcher()
41 private val simulations = ConcurrentHashMap<UUID, Status>()
43 fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
44 val id = UUID.randomUUID()
45 simulations[id] = StatusOngoing
48 simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
51 logger.withWarn { log("Error", err) }
52 simulations[id] = StatusFailure(err)
55 logger.info { "Finished sending messages" }
56 simulations[id] = StatusSuccess
58 ).also { updateHealthState() }
63 private fun updateHealthState() = healthState.changeState(currentState())
65 private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
67 internal fun isAnySimulationPending() = simulations.any {
68 status(it.key) is StatusOngoing
71 fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
73 internal fun clear() = simulations.clear()
76 private val logger = Logger(XnfApiServer::class)
80 sealed class Status(val message: String) {
81 override fun toString() = this::class.simpleName ?: "null"
84 object StatusNotFound : Status("not found")
85 object StatusOngoing : Status("ongoing")
86 object StatusSuccess : Status("success")
87 data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")