From e7204cbcf6af61856330cffc541b6f5c78476a09 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 24 Jan 2019 11:21:26 +0100 Subject: [PATCH] Correct totalConnections metric In previous implementation performed healthcheck on container was counted as client connection, because metric was notified for every TCP channel opened. This was making this metric less useful (to avoid saying "useless" ;) ). - refactored NettyTcpServer trying to extract functions with logic not so strictly related to HV-VES behavior. This also resolves discussions in https://gerrit.onap.org/r/#/c/76274/ - some renames and methods splitting was made in attempt to make code more readable - hv-ves should not count connections from either "127.0.0.1" or "localhost" to his `totalConnections` metric - removed redundant logging by adding new methods to Logger Change-Id: I5f10dac8dac82eafd3b0de92a7ec43f2c23b8c16 Issue-ID: DCAEGEN2-1119 Signed-off-by: Filip Krzywka --- .../src/main/resources/onap-detekt-config.yml | 2 +- development/docker-compose.yml | 2 +- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 123 ++++++++++----------- .../collectors/veshv/impl/socket/networking.kt | 81 ++++++++++++++ .../dcae/collectors/veshv/utils/logging/Logger.kt | 36 ++++++ 5 files changed, 175 insertions(+), 69 deletions(-) create mode 100644 sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt diff --git a/build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml b/build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml index 36f092dd..f8d8b1ee 100644 --- a/build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml +++ b/build/hv-collector-analysis/src/main/resources/onap-detekt-config.yml @@ -116,7 +116,7 @@ complexity: thresholdInObjects: 11 thresholdInEnums: 11 ignoreDeprecated: false - ignorePrivate: true + ignorePrivate: false empty-blocks: active: true diff --git a/development/docker-compose.yml b/development/docker-compose.yml index b3257b3d..c93100ef 100644 --- a/development/docker-compose.yml +++ b/development/docker-compose.yml @@ -73,7 +73,7 @@ services: environment: JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml" healthcheck: - test: curl -f http://localhost:6060/health/ready || exit 1 + test: ./healthcheck.sh || exit 1 interval: 10s timeout: 3s retries: 3 diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index adc629bc..0d07d167 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,18 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.None -import arrow.core.Option +import arrow.core.Try import arrow.core.getOrElse import arrow.effects.IO -import arrow.syntax.collections.firstOption -import io.netty.handler.ssl.SslHandler +import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -40,14 +37,12 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.Marker import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux import reactor.netty.Connection import reactor.netty.NettyInbound import reactor.netty.NettyOutbound import reactor.netty.tcp.TcpServer -import java.security.cert.X509Certificate +import java.net.InetAddress import java.time.Duration -import javax.net.ssl.SSLSession /** @@ -82,64 +77,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, this } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono { - metrics.notifyClientConnected() - val clientContext = ClientContext(nettyOutbound.alloc()) - nettyInbound.withConnection { - populateClientContext(clientContext, it) - it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession -> - sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name } + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = + messageHandlingStream(nettyInbound, nettyOutbound).run { + subscribe() + nettyOutbound.neverComplete() } - } - logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } - messageHandlingStream(clientContext, nettyInbound).subscribe() - return nettyOutbound.neverComplete() - } + private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono = + withNewClientContextFrom(nettyInbound, nettyOutbound) + { clientContext -> + logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" } + + clientContext.clientAddress + .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) } + .getOrElse { + logger.warn(clientContext::fullMdc) { + "Client address could not be resolved. Discarding connection" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } + } - private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono = - collectorProvider(clientContext).fold( + private fun acceptIfNotLocalConnection(address: InetAddress, + clientContext: ClientContext, + nettyInbound: NettyInbound): Mono = + if (address.isLocalClientAddress()) { + logger.debug(clientContext) { + "Client address resolved to localhost. Discarding connection as suspected healthcheck" + } + nettyInbound.closeConnectionAndReturn(Mono.empty()) + } else { + acceptClientConnection(clientContext, nettyInbound) + } + + private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono { + metrics.notifyClientConnected() + logger.info(clientContext::fullMdc) { "Handling new client connection" } + return collectorProvider(clientContext).fold( { - logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." } - Mono.empty() + logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" } + nettyInbound.closeConnectionAndReturn(Mono.empty()) }, - { - logger.info(clientContext::fullMdc) { "Handling new connection" } - nettyInbound.withConnection { conn -> - conn - .configureIdleTimeout(clientContext, serverConfig.idleTimeout) - .logConnectionClosed(clientContext) - } - it.handleConnection(createDataStream(nettyInbound)) - } + handleClient(clientContext, nettyInbound) ) - - private fun populateClientContext(clientContext: ClientContext, connection: Connection) { - clientContext.clientAddress = try { - Option.fromNullable(connection.address().address) - } catch (ex: Exception) { - None - } - clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert) } - private fun getSslSession(connection: Connection) = Option.fromNullable( + private fun handleClient(clientContext: ClientContext, + nettyInbound: NettyInbound): (Collector) -> Mono = { collector -> + withConnectionFrom(nettyInbound) { connection -> connection - .channel() - .pipeline() - .get(SslHandler::class.java) - ?.engine() - ?.session) - - private fun findClientCert(sslSession: SSLSession): Option = - sslSession - .peerCertificates - .firstOption() - .flatMap { Option.fromNullable(it as? X509Certificate) } - - private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .receive() - .retain() + .configureIdleTimeout(clientContext, serverConfig.idleTimeout) + .logConnectionClosed(clientContext) + }.run { + collector.handleConnection(nettyInbound.createDataStream()) + } + } private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection = onReadIdle(timeout.toMillis()) { @@ -149,16 +141,13 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, disconnectClient(ctx) } - - private fun Connection.disconnectClient(ctx: ClientContext) { - channel().close().addListener { - logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." } - if (it.isSuccess) - logger.debug(ctx) { "Channel closed successfully." } - else - logger.withWarn(ctx) { log("Channel close failed", it.cause()) } - } - } + private fun Connection.disconnectClient(ctx: ClientContext) = + closeChannelAndThen { + if (it.isSuccess) + logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." } + else + logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause()) + } private fun Connection.logConnectionClosed(ctx: ClientContext): Connection = onDispose { diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt new file mode 100644 index 00000000..91f502e6 --- /dev/null +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.socket + +import arrow.core.Option +import arrow.core.Try +import arrow.syntax.collections.firstOption +import io.netty.handler.ssl.SslHandler +import io.netty.util.concurrent.Future +import org.onap.dcae.collectors.veshv.model.ClientContext +import reactor.core.publisher.Mono +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import java.net.InetAddress +import java.security.cert.X509Certificate +import javax.net.ssl.SSLSession + +internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost" + +internal fun Connection.getSslSession(): Option = + Option.fromNullable( + channel() + .pipeline() + .get(SslHandler::class.java) + ?.engine() + ?.session + ) + +internal fun SSLSession.findClientCert(): Option = + peerCertificates + .firstOption() + .flatMap { Option.fromNullable(it as? X509Certificate) } + +internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) = + nettyInboud.withConnection(task) + +internal fun Connection.closeChannel() = channel().close() + +internal fun Connection.closeChannelAndThen(task: (Future) -> Unit) = + closeChannel().addListener { task(it) } + +internal fun NettyInbound.closeConnectionAndReturn(returnValue: T): T = + withConnectionFrom(this) { it.closeChannel() }.let { returnValue } + +internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain() + +// +// ClientContext related +// + +internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound, + nettyOutbound: NettyOutbound, + reactiveTask: (ClientContext) -> Mono) = + ClientContext(nettyOutbound.alloc()) + .also { populateClientContextFromInbound(it, nettyInbound) } + .run(reactiveTask) + +internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) = + withConnectionFrom(nettyInbound) { connection -> + clientContext.clientAddress = Try { connection.address().address }.toOption() + clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() } + } \ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index ade9b480..7fcc73a0 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -64,6 +64,9 @@ class Logger(logger: org.slf4j.Logger) { fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = errorLogger.withMdc(mdc) { log(marker, message()) } + fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + errorLogger.withMdc(mdc) { log(marker, message(), t) } + // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() @@ -81,6 +84,9 @@ class Logger(logger: org.slf4j.Logger) { fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = warnLogger.withMdc(mdc) { log(marker, message()) } + fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + warnLogger.withMdc(mdc) { log(marker, message(), t) } + // INFO fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() @@ -144,6 +150,7 @@ abstract class AtLevelLogger { abstract fun log(message: String) abstract fun log(message: String, t: Throwable) abstract fun log(marker: Marker, message: String) + abstract fun log(marker: Marker, message: String, t: Throwable) open val enabled: Boolean get() = true @@ -187,6 +194,10 @@ object OffLevelLogger : AtLevelLogger() { override fun log(marker: Marker, message: String) { // do not log anything } + + override fun log(marker: Marker, message: String, t: Throwable) { + // do no log anything + } } @Suppress("SuboptimalLoggerUsage") @@ -203,6 +214,11 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.error(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.error(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -219,6 +235,11 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.warn(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.warn(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -235,6 +256,11 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.info(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.info(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -251,6 +277,11 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.debug(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.debug(marker.slf4jMarker, message, t) + } } @Suppress("SuboptimalLoggerUsage") @@ -267,4 +298,9 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { withAdditionalMdc(marker.mdc) { logger.trace(marker.slf4jMarker, message) } + + override fun log(marker: Marker, message: String, t: Throwable) = + withAdditionalMdc(marker.mdc) { + logger.trace(marker.slf4jMarker, message, t) + } } -- 2.16.6