2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.main
22 import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
23 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
24 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
25 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
27 import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
28 import org.onap.dcae.collectors.veshv.main.servers.VesServer
29 import org.onap.dcae.collectors.veshv.model.ServiceContext
30 import org.onap.dcae.collectors.veshv.utils.ServerHandle
31 import org.onap.dcae.collectors.veshv.utils.logging.Logger
32 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
33 import reactor.core.publisher.Mono
34 import reactor.core.scheduler.Schedulers
35 import java.time.Duration
36 import java.util.concurrent.atomic.AtomicReference
39 private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
40 private val logger = Logger("$VES_HV_PACKAGE.main")
42 private val hvVesServer = AtomicReference<ServerHandle>()
43 private val configurationModule = ConfigurationModule()
44 private val maxCloseTime = Duration.ofSeconds(10)
46 fun main(args: Array<String>) {
47 val configStateListener = object : ConfigurationStateListener {
48 override fun retrying() {
49 HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
53 HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
55 .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
56 .publishOn(Schedulers.single(Schedulers.elastic()))
58 logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
59 HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
62 logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" }
63 logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
64 HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
66 .flatMap(::startServer)
67 .doOnError(::logServerStartFailed)
72 private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
74 .timeout(maxCloseTime)
75 .then(deferredVesServer(config))
77 registerShutdownHook { shutdownGracefully(it) }
81 private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
82 Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
83 logger.debug(ServiceContext::mdc) { "Configuration: $config" }
84 VesServer.start(config)
87 private fun stopRunningServer() = Mono.defer {
88 hvVesServer.get()?.close() ?: Mono.empty()
91 internal fun shutdownGracefully(serverHandle: ServerHandle,
92 healthState: HealthState = HealthState.INSTANCE) {
93 logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
94 healthState.changeState(HealthDescription.SHUTTING_DOWN)
95 serverHandle.close().block(maxCloseTime)
96 logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
99 private fun logServerStartFailed(ex: Throwable) =
100 logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }