a1e5b8fd583177285aafac23e3acc8d435be86fe
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / socket / networking.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.socket
21
22 import arrow.core.Option
23 import arrow.core.Try
24 import arrow.syntax.collections.firstOption
25 import io.netty.handler.ssl.SslHandler
26 import io.netty.util.concurrent.Future
27 import org.onap.dcae.collectors.veshv.model.ClientContext
28 import reactor.core.publisher.Mono
29 import reactor.netty.ByteBufFlux
30 import reactor.netty.Connection
31 import reactor.netty.NettyInbound
32 import reactor.netty.NettyOutbound
33 import java.net.InetAddress
34 import java.security.cert.X509Certificate
35 import javax.net.ssl.SSLSession
36
37 internal fun InetAddress.isLocalClientAddress() = hostAddress == "127.0.0.1" || hostName == "localhost"
38
39 internal fun Connection.getSslSession(): Option<SSLSession> =
40         Option.fromNullable(
41                 channel()
42                         .pipeline()
43                         .get(SslHandler::class.java)
44                         ?.engine()
45                         ?.session
46         )
47
48 internal fun SSLSession.findClientCert(): Option<X509Certificate> =
49         peerCertificates
50                 .firstOption()
51                 .flatMap { Option.fromNullable(it as? X509Certificate) }
52
53 internal fun withConnectionFrom(nettyInboud: NettyInbound, task: (Connection) -> Unit) =
54         nettyInboud.withConnection(task)
55
56 internal fun Connection.closeChannel() = channel().close()
57
58 internal fun Connection.closeChannelAndThen(task: (Future<in Void>) -> Unit) =
59         closeChannel().addListener { task(it) }
60
61 internal fun <T> NettyInbound.closeConnectionAndReturn(returnValue: T): T =
62         withConnectionFrom(this) { it.closeChannel() }.let { returnValue }
63
64 internal fun NettyInbound.createDataStream(): ByteBufFlux = receive().retain()
65
66 //
67 // ClientContext related
68 //
69
70 internal inline fun withNewClientContextFrom(nettyInbound: NettyInbound,
71                                              nettyOutbound: NettyOutbound,
72                                              reactiveTask: (ClientContext) -> Mono<Void>) =
73         ClientContext(nettyOutbound.alloc())
74                 .also { populateClientContextFromInbound(it, nettyInbound) }
75                 .run(reactiveTask)
76
77 internal fun populateClientContextFromInbound(clientContext: ClientContext, nettyInbound: NettyInbound) =
78         withConnectionFrom(nettyInbound) { connection ->
79             clientContext.clientAddress = Try { connection.address().address }.toOption()
80             clientContext.clientCert = connection.getSslSession().flatMap { it.findClientCert() }
81         }