*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.None
-import arrow.core.Option
+import arrow.core.Try
import arrow.core.getOrElse
import arrow.effects.IO
-import arrow.syntax.collections.firstOption
-import io.netty.handler.ssl.SslHandler
+import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
-import java.security.cert.X509Certificate
+import java.net.InetAddress
import java.time.Duration
-import javax.net.ssl.SSLSession
/**
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- metrics.notifyClientConnected()
- val clientContext = ClientContext(nettyOutbound.alloc())
- nettyInbound.withConnection {
- populateClientContext(clientContext, it)
- it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
- sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ messageHandlingStream(nettyInbound, nettyOutbound).run {
+ subscribe()
+ nettyOutbound.neverComplete()
}
- }
- logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
- messageHandlingStream(clientContext, nettyInbound).subscribe()
- return nettyOutbound.neverComplete()
- }
+ private fun messageHandlingStream(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ withNewClientContextFrom(nettyInbound, nettyOutbound)
+ { clientContext ->
+ logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
+
+ clientContext.clientAddress
+ .map { acceptIfNotLocalConnection(it, clientContext, nettyInbound) }
+ .getOrElse {
+ logger.warn(clientContext::fullMdc) {
+ "Client address could not be resolved. Discarding connection"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty())
+ }
+ }
- private fun messageHandlingStream(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> =
- collectorProvider(clientContext).fold(
+ private fun acceptIfNotLocalConnection(address: InetAddress,
+ clientContext: ClientContext,
+ nettyInbound: NettyInbound): Mono<Void> =
+ if (address.isLocalClientAddress()) {
+ logger.debug(clientContext) {
+ "Client address resolved to localhost. Discarding connection as suspected healthcheck"
+ }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
+ } else {
+ acceptClientConnection(clientContext, nettyInbound)
+ }
+
+ private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
+ metrics.notifyClientConnected()
+ logger.info(clientContext::fullMdc) { "Handling new client connection" }
+ return collectorProvider(clientContext).fold(
{
- logger.warn(clientContext::fullMdc) { "Collector not ready. Closing connection..." }
- Mono.empty()
+ logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
+ nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
},
- {
- logger.info(clientContext::fullMdc) { "Handling new connection" }
- nettyInbound.withConnection { conn ->
- conn
- .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
- .logConnectionClosed(clientContext)
- }
- it.handleConnection(createDataStream(nettyInbound))
- }
+ handleClient(clientContext, nettyInbound)
)
-
- private fun populateClientContext(clientContext: ClientContext, connection: Connection) {
- clientContext.clientAddress = try {
- Option.fromNullable(connection.address().address)
- } catch (ex: Exception) {
- None
- }
- clientContext.clientCert = getSslSession(connection).flatMap(::findClientCert)
}
- private fun getSslSession(connection: Connection) = Option.fromNullable(
+ private fun handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ withConnectionFrom(nettyInbound) { connection ->
connection
- .channel()
- .pipeline()
- .get(SslHandler::class.java)
- ?.engine()
- ?.session)
-
- private fun findClientCert(sslSession: SSLSession): Option<X509Certificate> =
- sslSession
- .peerCertificates
- .firstOption()
- .flatMap { Option.fromNullable(it as? X509Certificate) }
-
- private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .receive()
- .retain()
+ .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
+ }.run {
+ collector.handleConnection(nettyInbound.createDataStream())
+ }
+ }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {
disconnectClient(ctx)
}
-
- private fun Connection.disconnectClient(ctx: ClientContext) {
- channel().close().addListener {
- logger.debug(ctx::fullMdc, Marker.Exit) { "Closing client channel." }
- if (it.isSuccess)
- logger.debug(ctx) { "Channel closed successfully." }
- else
- logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
- }
- }
+ private fun Connection.disconnectClient(ctx: ClientContext) =
+ closeChannelAndThen {
+ if (it.isSuccess)
+ logger.debug(ctx::fullMdc, Marker.Exit) { "Channel closed successfully." }
+ else
+ logger.warn(ctx::fullMdc, Marker.Exit, { "Channel close failed" }, it.cause())
+ }
private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
onDispose {