Creation of server module
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-main / src / main / kotlin / org / onap / dcae / collectors / veshv / main / main.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main
21
22 import org.onap.dcae.collectors.veshv.api.ServersFactory
23 import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
24 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
25 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
27 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
28 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
29 import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
30 import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
31 import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
32 import org.onap.dcae.collectors.veshv.utils.ServerHandle
33 import org.onap.dcae.collectors.veshv.utils.logging.Logger
34 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
35 import reactor.core.publisher.Mono
36 import reactor.core.scheduler.Schedulers
37 import java.time.Duration
38 import java.util.concurrent.atomic.AtomicReference
39
40
41 private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
42 private val logger = Logger("$VES_HV_PACKAGE.main")
43
44 private val hvVesServer = AtomicReference<ServerHandle>()
45 private val configurationModule = ConfigurationModule()
46 private val sslContextFactory = SslContextFactory()
47 private val maxCloseTime = Duration.ofSeconds(10)
48
49 fun main(args: Array<String>) {
50     val configStateListener = object : ConfigurationStateListener {
51         override fun retrying() {
52             HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
53         }
54     }
55
56     HealthCheckServer.start(configurationModule.healthCheckPort(args)).block()
57     configurationModule
58             .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
59             .publishOn(Schedulers.single(Schedulers.elastic()))
60             .doOnNext {
61                 logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
62                 HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
63             }
64             .doOnError {
65                 logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" }
66                 logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
67                 HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
68             }
69             .flatMap(::startServer)
70             .doOnError(::logServerStartFailed)
71             .then()
72             .block()
73 }
74
75 private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> =
76         stopRunningServer()
77                 .timeout(maxCloseTime)
78                 .then(deferredVesServer(config))
79                 .doOnNext {
80                     registerShutdownHook { shutdownGracefully(it) }
81                     hvVesServer.set(it)
82                 }
83
84 private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
85     Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
86     logger.debug(ServiceContext::mdc) { "Configuration: $config" }
87     ServersFactory.createHvVesServer(
88             config,
89             sslContextFactory,
90             MicrometerMetrics.INSTANCE
91     ).start()
92 }
93
94 private fun stopRunningServer() = Mono.defer {
95     hvVesServer.get()?.close() ?: Mono.empty()
96 }
97
98 internal fun shutdownGracefully(serverHandle: ServerHandle,
99                                 healthState: HealthState = HealthState.INSTANCE) {
100     logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
101     healthState.changeState(HealthDescription.SHUTTING_DOWN)
102     serverHandle.close().block(maxCloseTime)
103     logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
104 }
105
106 private fun logServerStartFailed(ex: Throwable) =
107         logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
108