Custom detekt rule for logger usage check
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.socket
21
22 import arrow.core.getOrElse
23 import arrow.effects.IO
24 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
25 import org.onap.dcae.collectors.veshv.boundary.Server
26 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
27 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
28 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
29 import org.onap.dcae.collectors.veshv.utils.ServerHandle
30 import org.onap.dcae.collectors.veshv.utils.logging.Logger
31 import reactor.core.publisher.Mono
32 import reactor.netty.ByteBufFlux
33 import reactor.netty.Connection
34 import reactor.netty.NettyInbound
35 import reactor.netty.NettyOutbound
36 import reactor.netty.tcp.TcpServer
37 import java.time.Duration
38
39 /**
40  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41  * @since May 2018
42  */
43 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
44                               private val sslContextFactory: ServerSslContextFactory,
45                               private val collectorProvider: CollectorProvider) : Server {
46
47     override fun start(): IO<ServerHandle> = IO {
48         val tcpServer = TcpServer.create()
49                 .addressSupplier { serverConfig.serverListenAddress }
50                 .configureSsl()
51                 .handle(this::handleConnection)
52
53         NettyServerHandle(tcpServer.bindNow())
54     }
55
56     private fun TcpServer.configureSsl() =
57             sslContextFactory
58                     .createSslContext(serverConfig.securityConfiguration)
59                     .map { sslContext ->
60                         this.secure { b -> b.sslContext(sslContext) }
61                     }.getOrElse { this }
62
63     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
64             collectorProvider().fold(
65                     {
66                         nettyInbound.withConnection { conn ->
67                             logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
68                         }
69                         Mono.empty()
70                     },
71                     {
72                         nettyInbound.withConnection { conn ->
73                             logger.info { "Handling connection from ${conn.address()}" }
74                             conn.configureIdleTimeout(serverConfig.idleTimeout)
75                                     .logConnectionClosed()
76                         }
77                         it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
78                     }
79             )
80
81     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
82             .receive()
83             .retain()
84
85     private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
86         onReadIdle(timeout.toMillis()) {
87             logger.info {
88                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
89             }
90             disconnectClient()
91         }
92         return this
93     }
94
95     private fun Connection.disconnectClient() {
96         channel().close().addListener {
97             if (it.isSuccess)
98                 logger.debug { "Channel (${address()}) closed successfully." }
99             else
100                 logger.warn("Channel close failed", it.cause())
101         }
102     }
103
104     private fun Connection.logConnectionClosed(): Connection {
105         onTerminate().subscribe {
106             logger.info("Connection from ${address()} has been closed")
107         }
108         return this
109     }
110
111     companion object {
112         private val logger = Logger(NettyTcpServer::class)
113     }
114 }