Upgrade hv-ves, reactor, protobuf and sdk versions
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-server / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / networking.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019-2020 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl
21
22 import arrow.core.Option
23 import arrow.core.Try
24 import arrow.syntax.collections.firstOption
25 import io.netty.handler.ssl.SslHandler
26 import io.netty.util.concurrent.Future
27 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
28 import reactor.core.publisher.Mono
29 import reactor.netty.ByteBufFlux
30 import reactor.netty.Connection
31 import reactor.netty.NettyInbound
32 import reactor.netty.NettyOutbound
33 import java.net.InetAddress
34 import java.net.InetSocketAddress
35 import java.security.cert.X509Certificate
36 import javax.net.ssl.SSLSession
37
38 internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
39
40 internal fun Connection.getSslSession(): Option<SSLSession> =
41         Option.fromNullable(
42                 channel()
43                         .pipeline()
44                         .get(SslHandler::class.java)
45                         ?.engine()
46                         ?.session
47         )
48
49 internal fun SSLSession.findClientCert(): Option<X509Certificate> =
50         peerCertificates
51                 .firstOption()
52                 .flatMap { Option.fromNullable(it as? X509Certificate) }
53
54 internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
55         nettyInboud.withConnection(task)
56
57 internal fun Connection.closeChannel() = channel().close()
58
59 internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
60         closeChannel().addListener { task(it) }
61
62 internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
63         withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
64
65 internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
66
67 //
68 // ClientContext related
69 //
70
71 internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
72                                              nettyOutbound: NettyOutbound,
73                                              reactiveTask: (ClientContext) -> Mono<Void>) =
74         ClientContext(nettyOutbound.alloc())
75                 .also { populateClientContextFromInbound(it, nettyInbound) }
76                 .run(reactiveTask)
77
78 internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
79         withConnectionFrom(nettyInbound) { connection ->
80             clientContext.clientAddress = Try { (connection.address() as InetSocketAddress).address }.toOption()
81             clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
82         }