import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
- logger.info("Collector configured with SSL enabled")
+ logger.info { "Collector configured with SSL enabled" }
this.secure { b -> b.sslContext(sslContext) }
}.getOrElse {
- logger.info("Collector configured with SSL disabled")
+ logger.info { "Collector configured with SSL disabled" }
this
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ val clientContext = ClientContext(nettyOutbound.alloc())
+ nettyInbound.withConnection {
+ clientContext.clientAddress = it.address()
+ }
+
+ return collectorProvider(clientContext).fold(
+ {
+ logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling new connection" }
+ nettyInbound.withConnection { conn ->
+ conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ .logConnectionClosed(clientContext)
}
- )
+ it.handleConnection(createDataStream(nettyInbound))
+ }
+ )
+ }
private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
- logger.info {
+ logger.info(ctx) {
"Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
- disconnectClient()
+ disconnectClient(ctx)
}
return this
}
- private fun Connection.disconnectClient() {
+ private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
+ logger.debug(ctx) { "Channel closed successfully." }
else
- logger.warn("Channel close failed", it.cause())
+ logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
}
}
- private fun Connection.logConnectionClosed(): Connection {
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
+ logger.info(ctx) { "Connection has been closed" }
}
return this
}