Get rid of arrow-effects usage
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-xnf-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / xnf / impl / simulations.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
21
22 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
26 import org.onap.dcae.collectors.veshv.utils.logging.Logger
27 import reactor.core.publisher.Mono
28 import reactor.core.scheduler.Scheduler
29 import reactor.core.scheduler.Schedulers
30 import java.util.*
31 import java.util.concurrent.ConcurrentHashMap
32
33 /**
34  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35  * @since August 2018
36  */
37 class OngoingSimulations(private val scheduler: Scheduler = Schedulers.elastic(),
38                          private val healthState: HealthState = HealthState.INSTANCE) {
39     private val simulations = ConcurrentHashMap<UUID, Status>()
40
41     fun startAsynchronousSimulation(simulationIo: Mono<Void>): UUID {
42         val id = UUID.randomUUID()
43         simulations[id] = StatusOngoing
44         updateHealthState()
45
46         simulationIo
47                 .publishOn(scheduler)
48                 .doOnSuccess {
49                     logger.info { "Finished sending messages" }
50                     simulations[id] = StatusSuccess
51                 }
52                 .doOnError { err ->
53                     logger.withWarn { log("Error", err) }
54                     simulations[id] = StatusFailure(err)
55                 }
56                 .doFinally { updateHealthState() }
57                 .subscribe()
58
59         return id
60     }
61
62     private fun updateHealthState() = healthState.changeState(currentState())
63
64     private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
65
66     internal fun isAnySimulationPending() = simulations.any {
67         status(it.key) is StatusOngoing
68     }
69
70     fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
71
72     internal fun clear() = simulations.clear()
73
74     companion object {
75         private val logger = Logger(XnfApiServer::class)
76     }
77 }
78
79 sealed class Status(val message: String) {
80     override fun toString() = this::class.simpleName ?: "null"
81 }
82
83 object StatusNotFound : Status("not found")
84 object StatusOngoing : Status("ongoing")
85 object StatusSuccess : Status("success")
86 data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")