Add all required and reasonable MDCs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
index 6f02d43..d8d786b 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
+import arrow.core.None
+import arrow.core.Option
 import arrow.core.getOrElse
+import arrow.core.toOption
 import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,6 +45,10 @@ import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
 import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -72,17 +81,21 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
         val clientContext = ClientContext(nettyOutbound.alloc())
         nettyInbound.withConnection {
-            clientContext.clientAddress = it.address()
+            populateClientContext(clientContext, it)
+            it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+                sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+            }
+
         }
 
-        logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+        logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
         return collectorProvider(clientContext).fold(
                 {
-                    logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+                    logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
                     Mono.empty()
                 },
                 {
-                    logger.info(clientContext::asMap) { "Handling new connection" }
+                    logger.info(clientContext::fullMdc) { "Handling new connection" }
                     nettyInbound.withConnection { conn ->
                         conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
                                 .logConnectionClosed(clientContext)
@@ -92,6 +105,29 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         )
     }
 
+    private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+        clientContext.clientAddress = try {
+            Option.fromNullable(connection.address().address)
+        } catch (ex: Exception) {
+            None
+        }
+        clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+    }
+
+    private fun getSslSession(connection: Connection) = Option.fromNullable(
+            connection
+                    .channel()
+                    .pipeline()
+                    .get(SslHandler::class.java)
+                    ?.engine()
+                    ?.session)
+
+    private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+            sslSession
+                    .peerCertificates
+                    .firstOption()
+                    .flatMap { Option.fromNullable(it as? X509Certificate) }
+
     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
@@ -108,7 +144,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
 
     private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
-            logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+            logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
             if (it.isSuccess)
                 logger.debug(ctx) { "Channel closed successfully." }
             else
@@ -119,7 +155,7 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
         onTerminate().subscribe {
             // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
-            logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+            logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
         }
         return this
     }