dfb388d81a12ac3a7c7bb477329e5238a8815429
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-main / src / main / kotlin / org / onap / dcae / collectors / veshv / main / main.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
23 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
24 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
25 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
27 import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
28 import org.onap.dcae.collectors.veshv.main.servers.VesServer
29 import org.onap.dcae.collectors.veshv.model.ServiceContext
30 import org.onap.dcae.collectors.veshv.utils.ServerHandle
31 import org.onap.dcae.collectors.veshv.utils.logging.Logger
32 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
33 import reactor.core.publisher.Mono
34 import reactor.core.scheduler.Schedulers
35 import java.time.Duration
36 import java.util.concurrent.atomic.AtomicReference
37
38
39 private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
40 private val logger = Logger("$VES_HV_PACKAGE.main")
41
42 private val hvVesServer = AtomicReference<ServerHandle>()
43 private val configurationModule = ConfigurationModule()
44 private val maxCloseTime = Duration.ofSeconds(10)
45
46 fun main(args: Array<String>) {
47     val configStateListener = object : ConfigurationStateListener {
48         override fun retrying() {
49             HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
50         }
51     }
52
53     HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
54     configurationModule
55             .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
56             .publishOn(Schedulers.single(Schedulers.elastic()))
57             .doOnNext {
58                 logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
59                 HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
60             }
61             .doOnError {
62                 logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" }
63                 logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
64                 HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
65             }
66             .flatMap(::startServer)
67             .doOnError(::logServerStartFailed)
68             .then()
69             .block()
70 }
71
72 private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
73         stopRunningServer()
74                 .timeout(maxCloseTime)
75                 .then(deferredVesServer(config))
76                 .doOnNext {
77                     registerShutdownHook { shutdownGracefully(it) }
78                     hvVesServer.set(it)
79                 }
80
81 private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
82     Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
83     logger.debug(ServiceContext::mdc) { "Configuration: $config" }
84     VesServer.start(config)
85 }
86
87 private fun stopRunningServer() = Mono.defer {
88     hvVesServer.get()?.close() ?: Mono.empty()
89 }
90
91 internal fun shutdownGracefully(serverHandle: ServerHandle,
92                                 healthState: HealthState = HealthState.INSTANCE) {
93     logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
94     healthState.changeState(HealthDescription.SHUTTING_DOWN)
95     serverHandle.close().block(maxCloseTime)
96     logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
97 }
98
99 private fun logServerStartFailed(ex: Throwable) =
100         logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
101