Add SSL/TLS to client simulator 93/58393/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 7 Jun 2018 06:46:51 +0000 (08:46 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 1 Aug 2018 10:34:27 +0000 (12:34 +0200)
Change-Id: Iedebd222be08931b95e52a84c8c4d9c0df9e1da1
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ArgBasedClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientConfiguration.kt
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt [new file with mode: 0644]
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/impl/VesHvClient.kt
hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/main.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt

index 1bf9046..628afda 100644 (file)
@@ -24,10 +24,15 @@ import org.apache.commons.cli.Options
 import org.apache.commons.cli.DefaultParser
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.HelpFormatter
+import java.io.File
+import java.nio.file.Paths
 
 
 internal object DefaultValues {
     const val MESSAGES_AMOUNT = 1
+    const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
+    const val CERT_FILE = "/etc/ves-hv/client.crt"
+    const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
 }
 
 /**
@@ -56,29 +61,67 @@ internal object ArgBasedClientConfiguration {
             .desc("Amount of messages to send")
             .build()
 
+    private val OPT_PK_FILE = Option.builder("k")
+            .longOpt("private-key-file")
+            .hasArg()
+            .desc("File with client private key in PEM format")
+            .build()
+
+    private val OPT_CERT_FILE = Option.builder("e")
+            .longOpt("cert-file")
+            .hasArg()
+            .desc("File with client certificate bundle")
+            .build()
+
+    private val OPT_TRUST_CERT_FILE = Option.builder("t")
+            .longOpt("trust-cert-file")
+            .hasArg()
+            .desc("File with trusted certificate bundle for trusting servers")
+            .build()
+
     private val options by lazy {
         val options = Options()
         options.addOption(OPT_VES_PORT)
         options.addOption(OPT_VES_HOST)
         options.addOption(OPT_MESSAGES_AMOUNT)
+        options.addOption(OPT_PK_FILE)
+        options.addOption(OPT_CERT_FILE)
+        options.addOption(OPT_TRUST_CERT_FILE)
         options
     }
 
     fun parse(args: Array<out String>): ClientConfiguration {
+
+
         val parser = DefaultParser()
 
         try {
-            parser.parse(options, args).run {
-                return ClientConfiguration(
-                        stringValue(OPT_VES_HOST),
-                        intValue(OPT_VES_PORT),
-                        intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT))
-            }
+            val cmdLine = parser.parse(options, args)
+            val host = cmdLine.stringValue(OPT_VES_HOST)
+            val port = cmdLine.intValue(OPT_VES_PORT)
+            val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
+            return ClientConfiguration(
+                    host,
+                    port,
+                    parseSecurityConfig(cmdLine),
+                    msgsAmount)
         } catch (ex: Exception) {
             throw WrongArgumentException(ex)
         }
     }
 
+    private fun parseSecurityConfig(cmdLine: CommandLine): ClientSecurityConfiguration {
+        val pkFile = cmdLine.stringValue(OPT_PK_FILE, DefaultValues.PRIVATE_KEY_FILE)
+        val certFile = cmdLine.stringValue(OPT_CERT_FILE, DefaultValues.CERT_FILE)
+        val trustCertFile = cmdLine.stringValue(OPT_TRUST_CERT_FILE, DefaultValues.TRUST_CERT_FILE)
+        return ClientSecurityConfiguration(
+                privateKey = stringPathToPath(pkFile),
+                cert = stringPathToPath(certFile),
+                trustedCert = stringPathToPath(trustCertFile))
+    }
+
+    private fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
+
     private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
             getOptionValue(option.opt)?.toInt() ?: default
 
@@ -88,12 +131,11 @@ internal object ArgBasedClientConfiguration {
     private fun CommandLine.stringValue(option: Option) =
             getOptionValue(option.opt)
 
+    private fun CommandLine.stringValue(option: Option, default: String) =
+            getOptionValue(option.opt) ?: default
 
-    class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) {
-        fun printMessage() {
-            println(message)
-        }
 
+    class WrongArgumentException(parent: Exception) : Exception(parent.message, parent) {
         fun printHelp(programName: String) {
             val formatter = HelpFormatter()
             formatter.printHelp(programName, options)
index 189dff6..e3cba57 100644 (file)
@@ -23,4 +23,8 @@ package org.onap.dcae.collectors.veshv.main.config
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-data class ClientConfiguration( val vesHost: String, val vesPort: Int ,val messagesAmount: Int)
+data class ClientConfiguration(
+        val vesHost: String,
+        val vesPort: Int,
+        val security: ClientSecurityConfiguration,
+        val messagesAmount: Int)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org.onap.dcae.collectors.veshv.main/config/ClientSecurityConfiguration.kt
new file mode 100644 (file)
index 0000000..fc7cf66
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.config
+
+import java.nio.file.Path
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+data class ClientSecurityConfiguration(
+        val privateKey: Path,
+        val cert: Path,
+        val trustedCert: Path)
index 108b664..4553ab2 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main.impl
 
-import io.netty.buffer.ByteBufAllocator
+import io.netty.handler.ssl.ClientAuth
+import io.netty.handler.ssl.SslContext
+import io.netty.handler.ssl.SslContextBuilder
+import io.netty.handler.ssl.SslProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.main.config.ClientConfiguration
+import org.onap.dcae.collectors.veshv.main.config.ClientSecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
@@ -37,11 +41,16 @@ import java.util.function.BiFunction
  */
 class VesHvClient(configuration: ClientConfiguration) {
 
-    private val logger = Logger(VesHvClient::class)
-    private val client: TcpClient = TcpClient.create(configuration.vesHost, configuration.vesPort)
+    private val client: TcpClient = TcpClient.builder()
+            .options { opts ->
+                opts.host(configuration.vesHost)
+                        .port(configuration.vesPort)
+                        .sslContext(createSslContext(configuration.security))
+            }
+            .build()
 
     fun send(messages: Flux<WireFrame>) {
-        client.start(BiFunction { i, o -> handler(i, o, messages) })
+        client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
     }
 
     // sending flux with multiple WireFrames not supported yet
@@ -54,8 +63,24 @@ class VesHvClient(configuration: ClientConfiguration) {
                 .asString(Charsets.UTF_8)
                 .subscribe { str -> logger.info("Server response: $str") }
 
+        val frames = messages
+                .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } }
+                .map { it.encode(nettyOutbound.alloc()) }
+
         return nettyOutbound
                 .options { it.flushOnEach() }
-                .send(messages.map { it.encode(ByteBufAllocator.DEFAULT) })
+                .send(frames)
+    }
+
+    private fun createSslContext(config: ClientSecurityConfiguration): SslContext =
+            SslContextBuilder.forClient()
+                    .keyManager(config.cert.toFile(), config.privateKey.toFile())
+                    .trustManager(config.trustedCert.toFile())
+                    .sslProvider(SslProvider.OPENSSL)
+                    .clientAuth(ClientAuth.REQUIRE)
+                    .build()
+
+    companion object {
+        private val logger = Logger(VesHvClient::class)
     }
 }
index cd57568..a41035d 100644 (file)
@@ -31,14 +31,17 @@ private val logger = getLogger("Simulator :: main")
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-fun main(args: Array<String>) = try {
-
-    val clientConfig = ArgBasedClientConfiguration.parse(args)
-    val messageFactory = MessageFactory()
-    val client = VesHvClient(clientConfig)
-    client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
-} catch (e: Exception) {
-    logger.error(e.localizedMessage)
-    logger.debug("An error occurred when starting ves client", e)
+fun main(args: Array<String>) {
+    try {
+        val clientConfig = ArgBasedClientConfiguration.parse(args)
+        val messageFactory = MessageFactory()
+        val client = VesHvClient(clientConfig)
+        client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+    } catch (e: ArgBasedClientConfiguration.WrongArgumentException) {
+        e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
+    } catch (e: Exception) {
+        logger.error(e.localizedMessage)
+        logger.debug("An error occurred when starting ves client", e)
+    }
 }
 
index 9cade1c..535fbe1 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -39,7 +40,9 @@ internal class VesHvCollector(
         private val sink: Sink) : Collector {
     override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
             dataStream
+                    .doOnNext(this::logIncomingMessage)
                     .flatMap(this::decodeWire)
+                    .doOnNext(this::logDecodedWireMessage)
                     .flatMap(this::decodeProtobuf)
                     .filter(this::validate)
                     .flatMap(this::findRoute)
@@ -47,6 +50,14 @@ internal class VesHvCollector(
                     .doOnNext(this::releaseMemory)
                     .then()
 
+    private fun logIncomingMessage(wire: ByteBuf) {
+        logger.debug { "Got message with total ${wire.readableBytes()} B"}
+    }
+
+    private fun logDecodedWireMessage(payload: ByteBuf) {
+        logger.debug { "Wire payload size: ${payload.readableBytes()} B"}
+    }
+
     private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
 
     private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
@@ -65,8 +76,6 @@ internal class VesHvCollector(
         msg.rawMessage.release()
     }
 
-
-
     private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
 
     private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
@@ -78,4 +87,8 @@ internal class VesHvCollector(
             Mono.just(result)
         }
     }
+
+    companion object {
+        val logger = Logger(VesHvCollector::class)
+    }
 }
index 8a34185..0aacb26 100644 (file)
@@ -28,6 +28,9 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import reactor.core.publisher.Flux
 import reactor.ipc.netty.http.client.HttpClient
@@ -41,6 +44,7 @@ import java.nio.ByteBuffer
  */
 object AdapterFactory {
     fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+    fun loggingSink(): SinkProvider = LoggingSinkProvider()
 
     fun staticConfigurationProvider(config: CollectorConfiguration) =
             object : ConfigurationProvider {
@@ -58,8 +62,25 @@ object AdapterFactory {
         }
     }
 
+
+    private class LoggingSinkProvider : SinkProvider {
+        override fun invoke(config: CollectorConfiguration): Sink {
+            return object : Sink {
+                private val logger = Logger(LoggingSinkProvider::class)
+                override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+                        messages
+                                .doOnNext { msg ->
+                                    logger.info { "Message routed to ${msg.topic}" }
+                                }
+                                .map { it.message }
+
+            }
+        }
+    }
+
     fun consulConfigurationProvider(url: String): ConfigurationProvider =
             ConsulConfigurationProvider(url, httpAdapter())
+
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
 
index 415aa21..208b1ba 100644 (file)
@@ -59,14 +59,15 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
 
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
         logger.debug("Got connection")
-        val pipe = collectorProvider().handleConnection(nettyInbound.receive())
 
-        val hello = nettyOutbound
+        val sendHello = nettyOutbound
                 .options { it.flushOnEach() }
                 .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
                 .then()
 
-        return hello.then(pipe)
+        val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive())
+
+        return sendHello.then(handleIncomingMessages)
     }
 
     companion object {
index 906441b..5bd63d8 100644 (file)
@@ -86,7 +86,9 @@ data class WireFrame(val payload: ByteBuf,
             return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
         }
 
-        private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES
+        private const val HEADER_SIZE =
+                3 * java.lang.Byte.BYTES +
+                1 * java.lang.Integer.BYTES
         private const val FF_BYTE: Short = 0xFF
         private const val SUPPORTED_MAJOR_VERSION: Short = 1
     }