4553ab20552ff990565e1637a5b3959f1f4e0568
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.main.impl
21
22 import io.netty.handler.ssl.ClientAuth
23 import io.netty.handler.ssl.SslContext
24 import io.netty.handler.ssl.SslContextBuilder
25 import io.netty.handler.ssl.SslProvider
26 import org.onap.dcae.collectors.veshv.domain.WireFrame
27 import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration
28 import org.onap.dcae.collectors.veshv.main.config.ClientSecurityConfiguration
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import org.reactivestreams.Publisher
31 import reactor.core.publisher.Flux
32 import reactor.ipc.netty.NettyInbound
33 import reactor.ipc.netty.NettyOutbound
34 import reactor.ipc.netty.tcp.TcpClient
35 import java.util.function.BiFunction
36
37
38 /**
39  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
40  * @since June 2018
41  */
42 class VesHvClient(configuration: ClientConfiguration) {
43
44     private val client: TcpClient = TcpClient.builder()
45             .options { opts ->
46                 opts.host(configuration.vesHost)
47                         .port(configuration.vesPort)
48                         .sslContext(createSslContext(configuration.security))
49             }
50             .build()
51
52     fun send(messages: Flux<WireFrame>) {
53         client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
54     }
55
56     // sending flux with multiple WireFrames not supported yet
57     private fun handler(nettyInbound: NettyInbound,
58                         nettyOutbound: NettyOutbound,
59                         messages: Flux<WireFrame>): Publisher<Void> {
60
61         nettyInbound
62                 .receive()
63                 .asString(Charsets.UTF_8)
64                 .subscribe { str -> logger.info("Server response: $str") }
65
66         val frames = messages
67                 .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } }
68                 .map { it.encode(nettyOutbound.alloc()) }
69
70         return nettyOutbound
71                 .options { it.flushOnEach() }
72                 .send(frames)
73     }
74
75     private fun createSslContext(config: ClientSecurityConfiguration): SslContext =
76             SslContextBuilder.forClient()
77                     .keyManager(config.cert.toFile(), config.privateKey.toFile())
78                     .trustManager(config.trustedCert.toFile())
79                     .sslProvider(SslProvider.OPENSSL)
80                     .clientAuth(ClientAuth.REQUIRE)
81                     .build()
82
83     companion object {
84         private val logger = Logger(VesHvClient::class)
85     }
86 }