0b2997faab2cce84ccc884e34bb101e4e24a2fbd
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / NettyTcpServer.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.socket
21
22 import arrow.core.getOrElse
23 import arrow.effects.IO
24 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
25 import org.onap.dcae.collectors.veshv.boundary.Server
26 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
27 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
28 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
29 import org.onap.dcae.collectors.veshv.utils.ServerHandle
30 import org.onap.dcae.collectors.veshv.utils.logging.Logger
31 import reactor.core.publisher.Mono
32 import reactor.netty.ByteBufFlux
33 import reactor.netty.Connection
34 import reactor.netty.NettyInbound
35 import reactor.netty.NettyOutbound
36 import reactor.netty.tcp.TcpServer
37 import java.time.Duration
38
39 /**
40  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41  * @since May 2018
42  */
43 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
44                               private val sslContextFactory: ServerSslContextFactory,
45                               private val collectorProvider: CollectorProvider) : Server {
46
47     override fun start(): IO<ServerHandle> = IO {
48         val tcpServer = TcpServer.create()
49                 .addressSupplier { serverConfig.serverListenAddress }
50                 .configureSsl()
51                 .handle(this::handleConnection)
52
53         NettyServerHandle(tcpServer.bindNow())
54     }
55
56     private fun TcpServer.configureSsl() =
57             sslContextFactory
58                     .createSslContext(serverConfig.securityConfiguration)
59                     .map { sslContext ->
60                         logger.info("Collector configured with SSL enabled")
61                         this.secure { b -> b.sslContext(sslContext) }
62                     }.getOrElse {
63                         logger.info("Collector configured with SSL disabled")
64                         this
65                     }
66
67     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
68             collectorProvider().fold(
69                     {
70                         nettyInbound.withConnection { conn ->
71                             logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
72                         }
73                         Mono.empty()
74                     },
75                     {
76                         nettyInbound.withConnection { conn ->
77                             logger.info { "Handling connection from ${conn.address()}" }
78                             conn.configureIdleTimeout(serverConfig.idleTimeout)
79                                     .logConnectionClosed()
80                         }
81                         it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
82                     }
83             )
84
85     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
86             .receive()
87             .retain()
88
89     private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
90         onReadIdle(timeout.toMillis()) {
91             logger.info {
92                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
93             }
94             disconnectClient()
95         }
96         return this
97     }
98
99     private fun Connection.disconnectClient() {
100         channel().close().addListener {
101             if (it.isSuccess)
102                 logger.debug { "Channel (${address()}) closed successfully." }
103             else
104                 logger.warn("Channel close failed", it.cause())
105         }
106     }
107
108     private fun Connection.logConnectionClosed(): Connection {
109         onTerminate().subscribe {
110             logger.info("Connection from ${address()} has been closed")
111         }
112         return this
113     }
114
115     companion object {
116         private val logger = Logger(NettyTcpServer::class)
117     }
118 }