Fix ssl related bug in xnf simulator 65/79465/5
authorJakub Dudycz <jakub.dudycz@nokia.com>
Fri, 1 Mar 2019 16:39:09 +0000 (17:39 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 5 Mar 2019 08:08:12 +0000 (09:08 +0100)
Fix bug when xnf simnulator was using same SecurityKeys object instance for every new VesClient,
which resulted in fault while trying to connect to collector.

With new implementation simulator reuses same HvVesProdcuer from SDK
for every VesEvent request received and creates new Producer for every
WireFrameEvent request. This allows to continue testing cases in which
there is need to assert if connection was dropped from malicious client.

Change-Id: I5f51a58de85cccf7de6ab2392f86259502be31dd
Issue-ID: DCAEGEN2-1291
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
pom.xml
sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt

diff --git a/pom.xml b/pom.xml
index dfccb2b..41a1c45 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
         <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
         <detekt.version>1.0.0-RC11</detekt.version>
-        <sdk.version>1.1.2-SNAPSHOT</sdk.version>
+        <sdk.version>1.1.3-SNAPSHOT</sdk.version>
 
         <!-- Protocol buffers -->
         <protobuf.version>3.6.1</protobuf.version>
index fb14263..478713e 100644 (file)
@@ -42,14 +42,19 @@ const val KEY_STORE_FILE = "/etc/ves-hv/server.p12"
 const val TRUST_STORE_FILE = "/etc/ves-hv/trust.p12"
 
 fun createSecurityConfiguration(cmdLine: CommandLine): Try<SecurityConfiguration> =
-        if (cmdLine.hasOption(CommandLineOption.SSL_DISABLE))
-            Try { disabledSecurityConfiguration() }
+        createSecurityConfigurationProvider(cmdLine).map { it() }
+
+fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> SecurityConfiguration> =
+        if (shouldDisableSsl(cmdLine))
+            Try { { disabledSecurityConfiguration() } }
         else
-            enabledSecurityConfiguration(cmdLine)
+            Try { { enabledSecurityConfiguration(cmdLine) } }
+
+private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE)
 
 private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None)
 
-private fun enabledSecurityConfiguration(cmdLine: CommandLine) = Try {
+private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
     val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE)
     val ksPass = cmdLine.stringValue(CommandLineOption.KEY_STORE_PASSWORD).getOrElse { "" }
     val tsFile = cmdLine.stringValue(CommandLineOption.TRUST_STORE_FILE, TRUST_STORE_FILE)
@@ -62,7 +67,8 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine) = Try {
             .trustStorePassword(Passwords.fromString(tsPass))
             .build()
 
-    SecurityConfiguration(keys = Some(keys))
+    return SecurityConfiguration(keys = Some(keys))
 }
 
+
 private fun pathFromFile(file: String) = Paths.get(file)
index 53a8826..93c4317 100644 (file)
@@ -26,6 +26,7 @@ import arrow.core.fix
 import arrow.effects.IO
 import arrow.instances.either.monad.monad
 import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
 import org.onap.dcae.collectors.veshv.utils.arrow.asIo
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.toFlux
 import java.io.InputStream
+import java.nio.ByteBuffer
 import javax.json.Json
 import javax.json.JsonArray
 
@@ -52,6 +58,8 @@ class XnfSimulator(
     private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
     private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
 
+    private val defaultHvVesClient by lazy { clientFactory.create() }
+
     fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
             Either.monad<ParsingError>().binding {
                 val json = parseJsonArray(messageParameters).bind()
@@ -64,23 +72,43 @@ class XnfSimulator(
                     .toEither()
                     .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
 
-    private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
-            .toFlux()
-            .flatMap(::simulate)
-            .then(Mono.just(Unit))
-            .asIo()
 
-    private fun simulate(parameters: MessageParameters): Mono<Unit> =
+    private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+            parameters
+                    .map(::asClientToMessages)
+                    .groupMessagesByClients()
+                    .flattenValuesToFlux()
+                    .toList()
+                    .toFlux()
+                    .map(::simulate)
+                    .then(Mono.just(Unit))
+                    .asIo()
+
+    private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
+            groupBy({ it.first }, { it.second })
+
+    private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> =
+            mapValues { Flux.concat(it.value) }
+
+    private fun asClientToMessages(parameters: MessageParameters) =
             when (parameters) {
                 is VesEventParameters -> {
-                    val messages = vesEventGenerator.createMessageFlux(parameters)
-                    val client = clientFactory.create()
-                    client.sendVesEvents(messages)
+                    val messages = vesEventGenerator
+                            .createMessageFlux(parameters)
+                            .map(VesEventOuterClass.VesEvent::toByteBuffer)
+                    Pair(defaultHvVesClient, messages)
                 }
                 is WireFrameParameters -> {
                     val messages = wireFrameGenerator.createMessageFlux(parameters)
                     val client = clientFactory.create(parameters.wireFrameVersion)
-                    client.sendRawPayload(messages)
+                    Pair(client, messages)
                 }
             }
+
+    private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable =
+            pair.first
+                    .sendRawPayload(pair.second, PayloadType.PROTOBUF)
+                    .subscribe()
 }
+
+internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer()
index afc157c..1957943 100644 (file)
@@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
 import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
-import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -34,15 +34,11 @@ import java.nio.ByteBuffer
  */
 class HvVesClient(private val producer: HvVesProducer) {
 
-    fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
-            producer.send(messages)
-                    .then { logger.info { "Ves Events have been sent" } }
+    fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> =
+            producer.sendRaw(messages, payloadType)
+                    .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } }
 
 
-    fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
-            producer.sendRaw(messages, PayloadType.UNDEFINED)
-                    .then { logger.info { "Raw messages have been sent" } }
-
     companion object {
         private val logger = Logger(HvVesClient::class)
     }
index b5751a3..0891e49 100644 (file)
@@ -26,19 +26,19 @@ import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.intValue
 import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
                 val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
                         WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
 
-                val security = createSecurityConfiguration(cmdLine)
-                    .doOnFailure { ex ->
-                        logger.withError {
-                            log("Could not read security keys", ex)
+                val security = createSecurityConfigurationProvider(cmdLine)
+                        .doOnFailure { ex ->
+                            logger.withError {
+                                log("Could not read security keys", ex)
+                            }
                         }
-                    }
-                    .toOption()
-                    .bind()
+                        .toOption()
+                        .bind()
 
                 SimulatorConfiguration(
                         InetSocketAddress(listenPort),
index 1db66f1..e9fecd6 100644 (file)
@@ -28,4 +28,4 @@ import java.net.InetSocketAddress
  * @since February 2019
  */
 data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
-                               val security: SecurityConfiguration)
+                               val securityProvider: () -> SecurityConfiguration)
index 5a0e73c..0021ed8 100644 (file)
@@ -31,4 +31,4 @@ data class SimulatorConfiguration(
         val healthCheckApiListenAddress: InetSocketAddress,
         val hvVesAddress: InetSocketAddress,
         val maxPayloadSizeBytes: Int,
-        val security: SecurityConfiguration)
+        val securityProvider: () -> SecurityConfiguration)
index a91fccd..72a1165 100644 (file)
@@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since February 2019
  */
-class ClientFactory(configuration: ClientConfiguration) {
+class ClientFactory(private val configuration: ClientConfiguration) {
 
-    private val partialConfig = ImmutableProducerOptions
+    fun create() = hvVesClient(partialConfiguration().build())
+
+    fun create(wireFrameVersion: WireFrameVersion) = hvVesClient(
+            partialConfiguration()
+                    .wireFrameVersion(wireFrameVersion)
+                    .build())
+
+    private fun partialConfiguration() = ImmutableProducerOptions
             .builder()
             .collectorAddresses(configuration.collectorAddresses)
-            .let { producerOptions ->
-                configuration.security.keys.fold(
-                        { producerOptions },
-                        { producerOptions.securityKeys(it) })
+            .let { options ->
+                configuration.securityProvider().keys.fold(
+                        { options },
+                        { options.securityKeys(it) })
             }
 
-    fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
-            buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
-
-
-    fun create(): HvVesClient = buildClient(partialConfig)
+    private fun hvVesClient(producerOptions: ImmutableProducerOptions) =
+            HvVesClient(HvVesProducerFactory.create(producerOptions))
 
-    private fun buildClient(config: ImmutableProducerOptions.Builder) =
-            HvVesClient(HvVesProducerFactory.create(config.build()))
 }
index 366c7e6..baa231c 100644 (file)
@@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
         IO.monad().binding {
             logger.info { "Using configuration: $config" }
             XnfHealthCheckServer().startServer(config).bind()
-            val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
+            val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
             val xnfSimulator = XnfSimulator(
                     ClientFactory(clientConfig),
                     MessageGeneratorFactory(config.maxPayloadSizeBytes)
index daf3061..1406153 100644 (file)
@@ -44,17 +44,6 @@ internal class HvVesClientTest : Spek({
         val hvVesProducer: HvVesProducer = mock()
         val cut = HvVesClient(hvVesProducer)
 
-        describe("handling ves events stream") {
-
-            val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
-            whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
-            cut.sendVesEvents(vesEvents)
-
-            it("should perform sending operation") {
-                verify(hvVesProducer).send(vesEvents)
-            }
-        }
-
         describe("handling raw message stream") {
 
             val rawMessages = Flux.empty<ByteBuffer>()
index 123f12a..29281cd 100644 (file)
@@ -23,6 +23,7 @@ import arrow.core.Left
 import arrow.core.None
 import arrow.core.Right
 import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.verify
 import com.nhaarman.mockitokotlin2.whenever
@@ -39,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParamete
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
 import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
 import org.onap.ves.VesEventOuterClass
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.core.publisher.Flux
@@ -120,13 +122,14 @@ internal class XnfSimulatorTest : Spek({
             whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
             whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
             whenever(clientFactory.create()).thenReturn(vesClient)
-            whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+            whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
 
             // when
             cut.startSimulation(json).map { it.unsafeRunSync() }
 
             // then
-            verify(vesClient).sendVesEvents(generatedMessages)
+            verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))
         }
     }
 })
\ No newline at end of file