2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.socket
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.getOrElse
25 import arrow.core.toOption
26 import arrow.effects.IO
27 import arrow.syntax.collections.firstOption
28 import io.netty.handler.ssl.SslHandler
29 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
30 import org.onap.dcae.collectors.veshv.boundary.Server
31 import org.onap.dcae.collectors.veshv.model.ClientContext
32 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
33 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
34 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
35 import org.onap.dcae.collectors.veshv.utils.logging.Marker
36 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
37 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
38 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
39 import org.onap.dcae.collectors.veshv.utils.ServerHandle
40 import org.onap.dcae.collectors.veshv.utils.logging.Logger
41 import reactor.core.publisher.Mono
42 import reactor.netty.ByteBufFlux
43 import reactor.netty.Connection
44 import reactor.netty.NettyInbound
45 import reactor.netty.NettyOutbound
46 import reactor.netty.tcp.TcpServer
47 import java.time.Duration
48 import java.lang.Exception
49 import java.security.cert.X509Certificate
50 import javax.net.ssl.SSLSession
54 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
57 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
58 private val sslContextFactory: ServerSslContextFactory,
59 private val collectorProvider: CollectorProvider) : Server {
61 override fun start(): IO<ServerHandle> = IO {
62 val tcpServer = TcpServer.create()
63 .addressSupplier { serverConfig.serverListenAddress }
65 .handle(this::handleConnection)
67 NettyServerHandle(tcpServer.bindNow())
70 private fun TcpServer.configureSsl() =
72 .createSslContext(serverConfig.securityConfiguration)
74 logger.info { "Collector configured with SSL enabled" }
75 this.secure { b -> b.sslContext(sslContext) }
77 logger.info { "Collector configured with SSL disabled" }
81 private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
82 val clientContext = ClientContext(nettyOutbound.alloc())
83 nettyInbound.withConnection {
84 populateClientContext(clientContext, it)
85 it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
86 sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
91 logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
92 return collectorProvider(clientContext).fold(
94 logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
98 logger.info(clientContext::fullMdc) { "Handling new connection" }
99 nettyInbound.withConnection { conn ->
100 conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
101 .logConnectionClosed(clientContext)
103 it.handleConnection(createDataStream(nettyInbound))
108 private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
109 clientContext.clientAddress = try {
110 Option.fromNullable(connection.address().address)
111 } catch (ex: Exception) {
114 clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
117 private fun getSslSession(connection: Connection) = Option.fromNullable(
121 .get(SslHandler::class.java)
125 private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
129 .flatMap { Option.fromNullable(it as? X509Certificate) }
131 private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
135 private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
136 onReadIdle(timeout.toMillis()) {
138 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
140 disconnectClient(ctx)
145 private fun Connection.disconnectClient(ctx: ClientContext) {
146 channel().close().addListener {
147 logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
149 logger.debug(ctx) { "Channel closed successfully." }
151 logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
155 private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
156 onTerminate().subscribe {
157 // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
158 logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
164 private val logger = Logger(NettyTcpServer::class)