Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.socket
21
22 import arrow.core.getOrElse
23 import arrow.effects.IO
24 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
25 import org.onap.dcae.collectors.veshv.boundary.Server
26 import org.onap.dcae.collectors.veshv.model.ClientContext
27 import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
28 import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
29 import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
30 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
31 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
32 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
33 import org.onap.dcae.collectors.veshv.utils.ServerHandle
34 import org.onap.dcae.collectors.veshv.utils.logging.Logger
35 import reactor.core.publisher.Mono
36 import reactor.netty.ByteBufFlux
37 import reactor.netty.Connection
38 import reactor.netty.NettyInbound
39 import reactor.netty.NettyOutbound
40 import reactor.netty.tcp.TcpServer
41 import java.time.Duration
42
43 /**
44  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
45  * @since May 2018
46  */
47 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
48                               private val sslContextFactory: ServerSslContextFactory,
49                               private val collectorProvider: CollectorProvider) : Server {
50
51     override fun start(): IO<ServerHandle> = IO {
52         val tcpServer = TcpServer.create()
53                 .addressSupplier { serverConfig.serverListenAddress }
54                 .configureSsl()
55                 .handle(this::handleConnection)
56
57         NettyServerHandle(tcpServer.bindNow())
58     }
59
60     private fun TcpServer.configureSsl() =
61             sslContextFactory
62                     .createSslContext(serverConfig.securityConfiguration)
63                     .map { sslContext ->
64                         logger.info { "Collector configured with SSL enabled" }
65                         this.secure { b -> b.sslContext(sslContext) }
66                     }.getOrElse {
67                         logger.info { "Collector configured with SSL disabled" }
68                         this
69                     }
70
71     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
72         val clientContext = ClientContext(nettyOutbound.alloc())
73         nettyInbound.withConnection {
74             clientContext.clientAddress = it.address()
75         }
76
77         return collectorProvider(clientContext).fold(
78                 {
79                     logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
80                     Mono.empty()
81                 },
82                 {
83                     logger.info { "Handling new connection" }
84                     nettyInbound.withConnection { conn ->
85                         conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
86                                 .logConnectionClosed(clientContext)
87                     }
88                     it.handleConnection(createDataStream(nettyInbound))
89                 }
90         )
91     }
92
93     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
94             .receive()
95             .retain()
96
97     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
98         onReadIdle(timeout.toMillis()) {
99             logger.info(ctx) {
100                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
101             }
102             disconnectClient(ctx)
103         }
104         return this
105     }
106
107     private fun Connection.disconnectClient(ctx: ClientContext) {
108         channel().close().addListener {
109             if (it.isSuccess)
110                 logger.debug(ctx) { "Channel closed successfully." }
111             else
112                 logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
113         }
114     }
115
116     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
117         onTerminate().subscribe {
118             logger.info(ctx) { "Connection has been closed" }
119         }
120         return this
121     }
122
123     companion object {
124         private val logger = Logger(NettyTcpServer::class)
125     }
126 }