Correct totalConnections metric 72/76272/7
authorFilip Krzywka <filip.krzywka@nokia.com>
Thu, 24 Jan 2019 10:21:26 +0000 (11:21 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 29 Jan 2019 11:24:31 +0000 (11:24 +0000)
In previous implementation performed healthcheck on container
was counted as client connection, because metric was notified
for every TCP channel opened. This was making this metric less useful
(to avoid saying "useless" ;) ).

- refactored NettyTcpServer trying to extract functions with logic
not so strictly related to HV-VES behavior. This also resolves
discussions in https://gerrit.onap.org/r/#/c/76274/
- some renames and methods splitting was made in attempt to make code
more readable
- hv-ves should not count connections from either "127.0.0.1" or
"localhost" to his `totalConnections` metric
- removed redundant logging by adding new methods to Logger

Change-Id: I5f10dac8dac82eafd3b0de92a7ec43f2c23b8c16
Issue-ID: DCAEGEN2-1119
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml
development/docker-compose.yml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt

index 36f092d..f8d8b1e 100644 (file)
@@ -116,7 +116,7 @@ complexity:
     thresholdInObjects: 11
     thresholdInEnums: 11
     ignoreDeprecated: false
-    ignorePrivate: true
+    ignorePrivate: false
 
 empty-blocks:
   active: true
index b3257b3..c93100e 100644 (file)
@@ -73,7 +73,7 @@ services:
     environment:
       JAVA_OPTS:  "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
     healthcheck:
-      test: curl -f http://localhost:6060/health/ready || exit 1
+      test: ./healthcheck.sh || exit 1
       interval: 10s
       timeout: 3s
       retries: 3
index adc629b..0d07d16 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
-import arrow.core.None
-import arrow.core.Option
+import arrow.core.Try
 import arrow.core.getOrElse
 import arrow.effects.IO
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
+import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
 import reactor.netty.Connection
 import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
-import java.security.cert.X509Certificate
+import java.net.InetAddress
 import java.time.Duration
-import javax.net.ssl.SSLSession
 
 
 /**
@@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                         this
                     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
-        metrics.notifyClientConnected()
-        val clientContext = ClientContext(nettyOutbound.alloc())
-        nettyInbound.withConnection {
-            populateClientContext(clientContext, it)
-            it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
-                sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+            messageHandlingStream(nettyInbound, nettyOutbound).run {
+                subscribe()
+                nettyOutbound.neverComplete()
             }
-        }
 
-        logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
-        messageHandlingStream(clientContext, nettyInbound).subscribe()
-        return nettyOutbound.neverComplete()
-    }
+    private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+            withNewClientContextFrom(nettyInbound, nettyOutbound)
+            { clientContext ->
+                logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+
+                clientContext.clientAddress
+                        .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+                        .getOrElse {
+                            logger.warn(clientContext::fullMdc) {
+                                "Client address could not be resolved. Discarding connection"
+                            }
+                            nettyInbound.closeConnectionAndReturn(Mono.empty())
+                        }
+            }
 
-    private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
-        collectorProvider(clientContext).fold(
+    private fun acceptIfNotLocalConnection(address: InetAddress,
+                                           clientContext: ClientContext,
+                                           nettyInbound: NettyInbound): Mono<Void> =
+            if (address.isLocalClientAddress()) {
+                logger.debug(clientContext) {
+                    "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+                }
+                nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+            } else {
+                acceptClientConnection(clientContext, nettyInbound)
+            }
+
+    private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+        metrics.notifyClientConnected()
+        logger.info(clientContext::fullMdc) { "Handling new client connection" }
+        return collectorProvider(clientContext).fold(
                 {
-                    logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
-                    Mono.empty()
+                    logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
+                    nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
                 },
-                {
-                    logger.info(clientContext::fullMdc) { "Handling new connection" }
-                    nettyInbound.withConnection { conn ->
-                        conn
-                                .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
-                                .logConnectionClosed(clientContext)
-                    }
-                    it.handleConnection(createDataStream(nettyInbound))
-                }
+                handleClient(clientContext, nettyInbound)
         )
-
-    private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
-        clientContext.clientAddress = try {
-            Option.fromNullable(connection.address().address)
-        } catch (ex: Exception) {
-            None
-        }
-        clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
     }
 
-    private fun getSslSession(connection: Connection) = Option.fromNullable(
+    private fun handleClient(clientContext: ClientContext,
+                             nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+        withConnectionFrom(nettyInbound) { connection ->
             connection
-                    .channel()
-                    .pipeline()
-                    .get(SslHandler::class.java)
-                    ?.engine()
-                    ?.session)
-
-    private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
-            sslSession
-                    .peerCertificates
-                    .firstOption()
-                    .flatMap { Option.fromNullable(it as? X509Certificate) }
-
-    private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
-            .receive()
-            .retain()
+                    .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                    .logConnectionClosed(clientContext)
+        }.run {
+            collector.handleConnection(nettyInbound.createDataStream())
+        }
+    }
 
     private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
             onReadIdle(timeout.toMillis()) {
@@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                 disconnectClient(ctx)
             }
 
-
-    private fun Connection.disconnectClient(ctx: ClientContext) {
-        channel().close().addListener {
-            logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
-            if (it.isSuccess)
-                logger.debug(ctx) { "Channel closed successfully." }
-            else
-                logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
-        }
-    }
+    private fun Connection.disconnectClient(ctx: ClientContext) =
+            closeChannelAndThen {
+                if (it.isSuccess)
+                    logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+                else
+                    logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+            }
 
     private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
             onDispose {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt
new file mode 100644 (file)
index 0000000..91f502e
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.socket
+
+import arrow.core.Option
+import arrow.core.Try
+import arrow.syntax.collections.firstOption
+import io.netty.handler.ssl.SslHandler
+import io.netty.util.concurrent.Future
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import reactor.core.publisher.Mono
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import java.net.InetAddress
+import java.security.cert.X509Certificate
+import javax.net.ssl.SSLSession
+
+internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
+
+internal fun Connection.getSslSession(): Option<SSLSession> =
+        Option.fromNullable(
+                channel()
+                        .pipeline()
+                        .get(SslHandler::class.java)
+                        ?.engine()
+                        ?.session
+        )
+
+internal fun SSLSession.findClientCert(): Option<X509Certificate> =
+        peerCertificates
+                .firstOption()
+                .flatMap { Option.fromNullable(it as? X509Certificate) }
+
+internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
+        nettyInboud.withConnection(task)
+
+internal fun Connection.closeChannel() = channel().close()
+
+internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
+        closeChannel().addListener { task(it) }
+
+internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
+        withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
+
+internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
+
+//
+// ClientContext related
+//
+
+internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
+                                             nettyOutbound: NettyOutbound,
+                                             reactiveTask: (ClientContext) -> Mono<Void>) =
+        ClientContext(nettyOutbound.alloc())
+                .also { populateClientContextFromInbound(it, nettyInbound) }
+                .run(reactiveTask)
+
+internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
+        withConnectionFrom(nettyInbound) { connection ->
+            clientContext.clientAddress = Try { connection.address().address }.toOption()
+            clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
+        }
\ No newline at end of file
index ade9b48..7fcc73a 100644 (file)
@@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) {
     fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
         errorLogger.withMdc(mdc) { log(marker, message()) }
 
+    fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+        errorLogger.withMdc(mdc) { log(marker, message(), t) }
+
     // WARN
 
     fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
@@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) {
     fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
         warnLogger.withMdc(mdc) { log(marker, message()) }
 
+    fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+        warnLogger.withMdc(mdc) { log(marker, message(), t) }
+
     // INFO
 
     fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
@@ -144,6 +150,7 @@ abstract class AtLevelLogger {
     abstract fun log(message: String)
     abstract fun log(message: String, t: Throwable)
     abstract fun log(marker: Marker, message: String)
+    abstract fun log(marker: Marker, message: String, t: Throwable)
 
     open val enabled: Boolean
         get() = true
@@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() {
     override fun log(marker: Marker, message: String) {
         // do not log anything
     }
+
+    override fun log(marker: Marker, message: String, t: Throwable) {
+        // do no log anything
+    }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         withAdditionalMdc(marker.mdc) {
             logger.error(marker.slf4jMarker, message)
         }
+
+    override fun log(marker: Marker, message: String, t: Throwable) =
+        withAdditionalMdc(marker.mdc) {
+            logger.error(marker.slf4jMarker, message, t)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         withAdditionalMdc(marker.mdc) {
             logger.warn(marker.slf4jMarker, message)
         }
+
+    override fun log(marker: Marker, message: String, t: Throwable) =
+        withAdditionalMdc(marker.mdc) {
+            logger.warn(marker.slf4jMarker, message, t)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         withAdditionalMdc(marker.mdc) {
             logger.info(marker.slf4jMarker, message)
         }
+
+    override fun log(marker: Marker, message: String, t: Throwable) =
+        withAdditionalMdc(marker.mdc) {
+            logger.info(marker.slf4jMarker, message, t)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         withAdditionalMdc(marker.mdc) {
             logger.debug(marker.slf4jMarker, message)
         }
+
+    override fun log(marker: Marker, message: String, t: Throwable) =
+        withAdditionalMdc(marker.mdc) {
+            logger.debug(marker.slf4jMarker, message, t)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
         withAdditionalMdc(marker.mdc) {
             logger.trace(marker.slf4jMarker, message)
         }
+
+    override fun log(marker: Marker, message: String, t: Throwable) =
+        withAdditionalMdc(marker.mdc) {
+            logger.trace(marker.slf4jMarker, message, t)
+        }
 }