Correct totalConnections metric
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
index adc629b..0d07d16 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
-import arrow.core.None
-import arrow.core.Option
+import arrow.core.Try
 import arrow.core.getOrElse
 import arrow.effects.IO
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
+import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
 import reactor.netty.Connection
 import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
-import java.security.cert.X509Certificate
+import java.net.InetAddress
 import java.time.Duration
-import javax.net.ssl.SSLSession
 
 
 /**
@@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                         this
                     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
-        metrics.notifyClientConnected()
-        val clientContext = ClientContext(nettyOutbound.alloc())
-        nettyInbound.withConnection {
-            populateClientContext(clientContext, it)
-            it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
-                sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+            messageHandlingStream(nettyInbound, nettyOutbound).run {
+                subscribe()
+                nettyOutbound.neverComplete()
             }
-        }
 
-        logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
-        messageHandlingStream(clientContext, nettyInbound).subscribe()
-        return nettyOutbound.neverComplete()
-    }
+    private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+            withNewClientContextFrom(nettyInbound, nettyOutbound)
+            { clientContext ->
+                logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+
+                clientContext.clientAddress
+                        .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+                        .getOrElse {
+                            logger.warn(clientContext::fullMdc) {
+                                "Client address could not be resolved. Discarding connection"
+                            }
+                            nettyInbound.closeConnectionAndReturn(Mono.empty())
+                        }
+            }
 
-    private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
-        collectorProvider(clientContext).fold(
+    private fun acceptIfNotLocalConnection(address: InetAddress,
+                                           clientContext: ClientContext,
+                                           nettyInbound: NettyInbound): Mono<Void> =
+            if (address.isLocalClientAddress()) {
+                logger.debug(clientContext) {
+                    "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+                }
+                nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+            } else {
+                acceptClientConnection(clientContext, nettyInbound)
+            }
+
+    private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+        metrics.notifyClientConnected()
+        logger.info(clientContext::fullMdc) { "Handling new client connection" }
+        return collectorProvider(clientContext).fold(
                 {
-                    logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
-                    Mono.empty()
+                    logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
+                    nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
                 },
-                {
-                    logger.info(clientContext::fullMdc) { "Handling new connection" }
-                    nettyInbound.withConnection { conn ->
-                        conn
-                                .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
-                                .logConnectionClosed(clientContext)
-                    }
-                    it.handleConnection(createDataStream(nettyInbound))
-                }
+                handleClient(clientContext, nettyInbound)
         )
-
-    private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
-        clientContext.clientAddress = try {
-            Option.fromNullable(connection.address().address)
-        } catch (ex: Exception) {
-            None
-        }
-        clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
     }
 
-    private fun getSslSession(connection: Connection) = Option.fromNullable(
+    private fun handleClient(clientContext: ClientContext,
+                             nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+        withConnectionFrom(nettyInbound) { connection ->
             connection
-                    .channel()
-                    .pipeline()
-                    .get(SslHandler::class.java)
-                    ?.engine()
-                    ?.session)
-
-    private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
-            sslSession
-                    .peerCertificates
-                    .firstOption()
-                    .flatMap { Option.fromNullable(it as? X509Certificate) }
-
-    private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
-            .receive()
-            .retain()
+                    .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                    .logConnectionClosed(clientContext)
+        }.run {
+            collector.handleConnection(nettyInbound.createDataStream())
+        }
+    }
 
     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
             onReadIdle(timeout.toMillis()) {
@@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                 disconnectClient(ctx)
             }
 
-
-    private fun Connection.disconnectClient(ctx: ClientContext) {
-        channel().close().addListener {
-            logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
-            if (it.isSuccess)
-                logger.debug(ctx) { "Channel closed successfully." }
-            else
-                logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
-        }
-    }
+    private fun Connection.disconnectClient(ctx: ClientContext) =
+            closeChannelAndThen {
+                if (it.isSuccess)
+                    logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+                else
+                    logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+            }
 
     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
             onDispose {