*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.core.None
+import arrow.core.Option
import arrow.core.getOrElse
+import arrow.core.toOption
import arrow.effects.IO
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ClientContext
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
import java.time.Duration
+import java.lang.Exception
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
- clientContext.clientAddress = it.address()
+ populateClientContext(clientContext, it)
+ it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
+ sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ }
+
}
- logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
Mono.empty()
},
{
- logger.info(clientContext::asMap) { "Handling new connection" }
+ logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
)
}
+ private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
+ clientContext.clientAddress = try {
+ Option.fromNullable(connection.address().address)
+ } catch (ex: Exception) {
+ None
+ }
+ clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
+ }
+
+ private fun getSslSession(connection: Connection) = Option.fromNullable(
+ connection
+ .channel()
+ .pipeline()
+ .get(SslHandler::class.java)
+ ?.engine()
+ ?.session)
+
+ private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
+ sslSession
+ .peerCertificates
+ .firstOption()
+ .flatMap { Option.fromNullable(it as? X509Certificate) }
+
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
- logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
if (it.isSuccess)
logger.debug(ctx) { "Channel closed successfully." }
else
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
// TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
}
return this
}