2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
22 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
26 import org.onap.dcae.collectors.veshv.utils.logging.Logger
27 import reactor.core.publisher.Mono
28 import reactor.core.scheduler.Scheduler
29 import reactor.core.scheduler.Schedulers
31 import java.util.concurrent.ConcurrentHashMap
34 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
37 class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
38 private val healthState: HealthState = HealthState.INSTANCE) {
39 private val simulations = ConcurrentHashMap<UUID, Status>()
41 fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
42 val id = UUID.randomUUID()
43 simulations[id] = StatusOngoing
49 logger.info { "Finished sending messages" }
50 simulations[id] = StatusSuccess
53 logger.withWarn { log("Error", err) }
54 simulations[id] = StatusFailure(err)
56 .doFinally { updateHealthState() }
62 private fun updateHealthState() = healthState.changeState(currentState())
64 private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
66 internal fun isAnySimulationPending() = simulations.any {
67 status(it.key) is StatusOngoing
70 fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
72 internal fun clear() = simulations.clear()
75 private val logger = Logger(XnfApiServer::class)
79 sealed class Status(val message: String) {
80 override fun toString() = this::class.simpleName ?: "null"
83 object StatusNotFound : Status("not found")
84 object StatusOngoing : Status("ongoing")
85 object StatusSuccess : Status("success")
86 data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")