Add log diagnostic context
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
index 0b2997f..2d29fe9 100644 (file)
@@ -23,6 +23,10 @@ import arrow.core.getOrElse
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +61,61 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
             sslContextFactory
                     .createSslContext(serverConfig.securityConfiguration)
                     .map { sslContext ->
-                        logger.info("Collector configured with SSL enabled")
+                        logger.info { "Collector configured with SSL enabled" }
                         this.secure { b -> b.sslContext(sslContext) }
                     }.getOrElse {
-                        logger.info("Collector configured with SSL disabled")
+                        logger.info { "Collector configured with SSL disabled" }
                         this
                     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
-            collectorProvider().fold(
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
-                        }
-                        Mono.empty()
-                    },
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.info { "Handling connection from ${conn.address()}" }
-                            conn.configureIdleTimeout(serverConfig.idleTimeout)
-                                    .logConnectionClosed()
-                        }
-                        it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+        val clientContext = ClientContext(nettyOutbound.alloc())
+        nettyInbound.withConnection {
+            clientContext.clientAddress = it.address()
+        }
+
+        return collectorProvider(clientContext).fold(
+                {
+                    logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+                    Mono.empty()
+                },
+                {
+                    logger.info { "Handling new connection" }
+                    nettyInbound.withConnection { conn ->
+                        conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                                .logConnectionClosed(clientContext)
                     }
-            )
+                    it.handleConnection(createDataStream(nettyInbound))
+                }
+        )
+    }
 
     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
 
-    private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+    private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
         onReadIdle(timeout.toMillis()) {
-            logger.info {
+            logger.info(ctx) {
                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
             }
-            disconnectClient()
+            disconnectClient(ctx)
         }
         return this
     }
 
-    private fun Connection.disconnectClient() {
+    private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
             if (it.isSuccess)
-                logger.debug { "Channel (${address()}) closed successfully." }
+                logger.debug(ctx) { "Channel closed successfully." }
             else
-                logger.warn("Channel close failed", it.cause())
+                logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
         }
     }
 
-    private fun Connection.logConnectionClosed(): Connection {
+    private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
         onTerminate().subscribe {
-            logger.info("Connection from ${address()} has been closed")
+            logger.info(ctx) { "Connection has been closed" }
         }
         return this
     }