725622f7d2bd8bda448ed024ffe2de8c44f00bc4
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.socket
21
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.getOrElse
25 import arrow.effects.IO
26 import arrow.syntax.collections.firstOption
27 import io.netty.handler.ssl.SslHandler
28 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
29 import org.onap.dcae.collectors.veshv.boundary.Metrics
30 import org.onap.dcae.collectors.veshv.boundary.Server
31 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
32 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
33 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
34 import org.onap.dcae.collectors.veshv.model.ClientContext
35 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
36 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
37 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
38 import org.onap.dcae.collectors.veshv.utils.ServerHandle
39 import org.onap.dcae.collectors.veshv.utils.logging.Logger
40 import org.onap.dcae.collectors.veshv.utils.logging.Marker
41 import reactor.core.publisher.Mono
42 import reactor.netty.ByteBufFlux
43 import reactor.netty.Connection
44 import reactor.netty.NettyInbound
45 import reactor.netty.NettyOutbound
46 import reactor.netty.tcp.TcpServer
47 import java.security.cert.X509Certificate
48 import java.time.Duration
49 import javax.net.ssl.SSLSession
50
51
52 /**
53  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
54  * @since May 2018
55  */
56 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
57                               private val sslContextFactory: ServerSslContextFactory,
58                               private val collectorProvider: CollectorProvider,
59                               private val metrics: Metrics) : Server {
60
61     override fun start(): IO<ServerHandle> = IO {
62         TcpServer.create()
63                 .addressSupplier { serverConfig.serverListenAddress }
64                 .configureSsl()
65                 .handle(this::handleConnection)
66                 .let { NettyServerHandle(it.bindNow()) }
67     }
68
69     private fun TcpServer.configureSsl() =
70             sslContextFactory
71                     .createSslContext(serverConfig.securityConfiguration)
72                     .map { sslContext ->
73                         logger.info { "Collector configured with SSL enabled" }
74                         this.secure { b -> b.sslContext(sslContext) }
75                     }.getOrElse {
76                         logger.info { "Collector configured with SSL disabled" }
77                         this
78                     }
79
80     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
81         metrics.notifyClientConnected()
82         val clientContext = ClientContext(nettyOutbound.alloc())
83         nettyInbound.withConnection {
84             populateClientContext(clientContext, it)
85             it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
86                 sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
87             }
88         }
89
90         logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
91         return collectorProvider(clientContext).fold(
92                 {
93                     logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
94                     Mono.empty()
95                 },
96                 {
97                     logger.info(clientContext::fullMdc) { "Handling new connection" }
98                     nettyInbound.withConnection { conn ->
99                         conn
100                                 .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
101                                 .logConnectionClosed(clientContext)
102                     }
103                     it.handleConnection(createDataStream(nettyInbound))
104                 }
105         )
106     }
107
108     private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
109         clientContext.clientAddress = try {
110             Option.fromNullable(connection.address().address)
111         } catch (ex: Exception) {
112             None
113         }
114         clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
115     }
116
117     private fun getSslSession(connection: Connection) = Option.fromNullable(
118             connection
119                     .channel()
120                     .pipeline()
121                     .get(SslHandler::class.java)
122                     ?.engine()
123                     ?.session)
124
125     private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
126             sslSession
127                     .peerCertificates
128                     .firstOption()
129                     .flatMap { Option.fromNullable(it as? X509Certificate) }
130
131     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
132             .receive()
133             .retain()
134
135     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
136             onReadIdle(timeout.toMillis()) {
137                 logger.info(ctx) {
138                     "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
139                 }
140                 disconnectClient(ctx)
141             }
142
143
144     private fun Connection.disconnectClient(ctx: ClientContext) {
145         channel().close().addListener {
146             logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
147             if (it.isSuccess)
148                 logger.debug(ctx) { "Channel closed successfully." }
149             else
150                 logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
151         }
152     }
153
154     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
155             onDispose {
156                 metrics.notifyClientDisconnected()
157                 logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
158             }
159
160     companion object {
161         private val logger = Logger(NettyTcpServer::class)
162     }
163 }