Use sdk/hvves-producer in hvves/xnf-simulator 68/78768/3
authorJakub Dudycz <jakub.dudycz@nokia.com>
Tue, 19 Feb 2019 17:06:33 +0000 (18:06 +0100)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Thu, 21 Feb 2019 11:54:54 +0000 (12:54 +0100)
Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1253

25 files changed:
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
sources/hv-collector-ves-message-generator/pom.xml
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/MessageGenerator.kt [moved from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt with 91% similarity]
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt [moved from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGenerator.kt with 61% similarity]
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/VesEventGenerator.kt [moved from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGenerator.kt with 95% similarity]
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt [moved from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParser.kt with 96% similarity]
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserImpl.kt
sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt [moved from sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGenerator.kt with 99% similarity]
sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/raw/RawMessageGeneratorTest.kt [moved from sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/wireframe/WireFrameGeneratorTest.kt with 64% similarity]
sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/CommonEventHeaderParserTest.kt
sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/PayloadGeneratorTest.kt
sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/vesevent/VesEventGeneratorTest.kt
sources/hv-collector-xnf-simulator/pom.xml
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt [new file with mode: 0644]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt [deleted file]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt [new file with mode: 0644]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt [new file with mode: 0644]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt [new file with mode: 0644]
sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt

index 5d9a7cf..47a2d22 100644 (file)
@@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
 import java.io.InputStream
index 8fb1b2e..bff7709 100644 (file)
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
index 290ef72..5682522 100644 (file)
@@ -23,11 +23,10 @@ import arrow.core.Either
 import arrow.core.Left
 import arrow.core.Right
 import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monadError.monadError
-import arrow.typeclasses.binding
+import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
 import kotlin.system.exitProcess
 
 /**
@@ -62,6 +61,9 @@ fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
     })
 }
 
+fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
+        toMono().then(Mono.fromCallable(callback))
+
 fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
         flatMap { io ->
             io.attempt().unsafeRunSync().fold(
index 29e32f4..e676dfa 100644 (file)
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk</groupId>
+            <artifactId>hvvesclient-producer-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java-util</artifactId>
index 82b79c0..a718716 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.api
 
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_WIRE_FRAME
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableWireFrameVersion
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
 import org.onap.ves.VesEventOuterClass
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
-abstract class MessageParameters(val amount: Long = -1)
+sealed class MessageParameters
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since February 2019
  */
 class WireFrameParameters(val messageType: WireFrameType,
-                          amount: Long = -1) : MessageParameters(amount)
+                          val amount: Long = -1) : MessageParameters() {
+
+    val wireFrameVersion: WireFrameVersion
+        get() = ImmutableWireFrameVersion.builder().let {
+            if (messageType == INVALID_WIRE_FRAME)
+                it.major(UNSUPPORTED_MAJOR_VERSION)
+            else
+                it
+        }.build()
+
+    companion object {
+        private const val UNSUPPORTED_MAJOR_VERSION: Short = 2
+    }
+}
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -40,4 +56,4 @@ class WireFrameParameters(val messageType: WireFrameType,
  */
 class VesEventParameters(val commonEventHeader: VesEventOuterClass.CommonEventHeader,
                          val messageType: VesEventType,
-                         amount: Long = -1) : MessageParameters(amount)
+                         val amount: Long = -1) : MessageParameters()
index aa47379..613f9bd 100644 (file)
@@ -19,9 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.ves.message.generator.factory
 
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.PayloadGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.VesEventGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireFrameGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -30,5 +30,5 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe.WireF
 class MessageGeneratorFactory(private val maxPayloadSizeBytes: Int) {
     fun createVesEventGenerator() = VesEventGenerator(PayloadGenerator(), maxPayloadSizeBytes)
 
-    fun createWireFrameGenerator() = WireFrameGenerator()
+    fun createWireFrameGenerator() = RawMessageGenerator()
 }
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+package org.onap.dcae.collectors.veshv.ves.message.generator.generators
 
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadContentType
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import io.netty.buffer.Unpooled
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.INVALID_GPB_DATA
@@ -30,37 +27,29 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.IN
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
 import java.nio.charset.Charset
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since February 2019
  */
-class WireFrameGenerator : MessageGenerator<WireFrameParameters, WireFrameMessage>() {
+class RawMessageGenerator : MessageGenerator<WireFrameParameters, ByteBuffer>() {
 
-    override fun createMessageFlux(parameters: WireFrameParameters): Flux<WireFrameMessage> =
+    override fun createMessageFlux(parameters: WireFrameParameters): Flux<ByteBuffer> =
             parameters.run {
                 Mono
                         .fromCallable { createMessage(messageType) }
                         .let { repeatMessage(it, amount) }
             }
 
-    private fun createMessage(messageType: WireFrameType): WireFrameMessage =
+    private fun createMessage(messageType: WireFrameType): ByteBuffer =
             when (messageType) {
-                INVALID_WIRE_FRAME -> {
-                    val payload = ByteData(VesEvent.getDefaultInstance().toByteArray())
-                    WireFrameMessage(
-                            payload,
-                            UNSUPPORTED_VERSION,
-                            UNSUPPORTED_VERSION,
-                            PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
-                            payload.size())
-                }
-                INVALID_GPB_DATA ->
-                    WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+                INVALID_WIRE_FRAME -> wrap(VesEvent.getDefaultInstance().toByteArray())
+                INVALID_GPB_DATA -> wrap("invalid vesEvent".toByteArray(Charset.defaultCharset()))
             }
 
-    companion object {
-        private const val UNSUPPORTED_VERSION: Short = 2
-    }
+    private fun wrap(bytes: ByteArray) = Unpooled.wrappedBuffer(bytes).nioBuffer()
+
+
 }
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+package org.onap.dcae.collectors.veshv.ves.message.generator.generators
 
 import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.FIXED_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.TOO_BIG_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
 
 import arrow.core.Option
 import com.google.protobuf.util.JsonFormat
@@ -29,7 +29,7 @@ import javax.json.JsonObject
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since July 2018
  */
-class CommonEventHeaderParser {
+internal class CommonEventHeaderParser {
     fun parse(json: JsonObject): Option<CommonEventHeader> = Option.fromNullable(
             CommonEventHeader.newBuilder()
                     .apply { JsonFormat.parser().merge(json.toString(), this) }
index 174a01f..7d6087c 100644 (file)
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType.Com
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType.Companion.isWireFrameType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.vesevent.CommonEventHeaderParser
 import javax.json.JsonArray
 import javax.json.JsonObject
 import javax.json.JsonValue
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2010 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.wireframe
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl.raw
 
 import com.google.protobuf.InvalidProtocolBufferException
 import org.assertj.core.api.Assertions
@@ -25,13 +25,13 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.RawMessageGenerator
 import org.onap.ves.VesEventOuterClass
 import reactor.test.test
-import kotlin.test.assertTrue
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -40,7 +40,7 @@ import kotlin.test.assertTrue
 object WireFrameGeneratorTest : Spek({
 
     val maxPayloadSizeBytes = 1024
-    val cut = WireFrameGenerator()
+    val cut = RawMessageGenerator()
 
     on("message type requesting invalid GPB data ") {
         it("should createVesEventGenerator flux of messages with invalid payload") {
@@ -50,32 +50,17 @@ object WireFrameGeneratorTest : Spek({
                     ))
                     .test()
                     .assertNext {
-                        assertTrue(it.validate().isRight())
-                        assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
-                        Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
-                                .isThrownBy { extractCommonEventHeader(it.payload) }
-                    }
-                    .verifyComplete()
-        }
-    }
+                        val decodedBytes = it.array().toString(Charset.defaultCharset())
+                        assertThat(decodedBytes).isEqualTo("invalid vesEvent")
+                        assertThat(it.capacity()).isLessThan(maxPayloadSizeBytes)
 
-    on("message type requesting invalid wire frame ") {
-        it("should createVesEventGenerator flux of messages with invalid version") {
-            cut
-                    .createMessageFlux(WireFrameParameters(
-                            WireFrameType.INVALID_WIRE_FRAME, 1
-                    ))
-                    .test()
-                    .assertNext {
-                        assertTrue(it.validate().isLeft())
-                        assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
-                        assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
+                        Assertions.assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+                                .isThrownBy { extractCommonEventHeader(it) }
                     }
                     .verifyComplete()
         }
     }
-
 })
 
-fun extractCommonEventHeader(bytes: ByteData): VesEventOuterClass.CommonEventHeader =
-        VesEventOuterClass.VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+private fun extractCommonEventHeader(bytes: ByteBuffer): VesEventOuterClass.CommonEventHeader =
+        VesEventOuterClass.VesEvent.parseFrom(bytes).commonEventHeader
index 04222d1..09635af 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.CommonEventHeaderParser
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import java.io.ByteArrayInputStream
 import javax.json.Json
index 2d77bb9..4558bb1 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 
 object PayloadGeneratorTest : Spek({
 
index 2f13c52..fa99bfb 100644 (file)
@@ -28,8 +28,10 @@ import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
 import reactor.test.test
 
 /**
index a813410..69ca53b 100644 (file)
             <groupId>org.onap.dcaegen2.services.sdk</groupId>
             <artifactId>hvvesclient-producer-impl</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk</groupId>
-            <artifactId>hvvesclient-producer-api</artifactId>
-        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-test-utils</artifactId>
index 4dfdb84..812afe1 100644 (file)
@@ -26,12 +26,15 @@ import arrow.core.fix
 import arrow.effects.IO
 import arrow.instances.either.monad.monad
 import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import reactor.core.publisher.toFlux
 import java.io.InputStream
 import javax.json.Json
@@ -42,18 +45,18 @@ import javax.json.JsonArray
  * @since August 2018
  */
 class XnfSimulator(
-        private val vesClient: VesHvClient,
+        private val clientFactory: ClientFactory,
         private val generatorFactory: MessageGeneratorFactory,
         private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
+    private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+    private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
     fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
             Either.monad<ParsingError>().binding {
-
                 val json = parseJsonArray(messageParameters).bind()
-                messageParametersParser.parse(json).bind()
-                        .toFlux()
-                        .flatMap(::generateMessages)
-                        .let { vesClient.sendIo(it) }
+                val parameters = messageParametersParser.parse(json).bind()
+                simulationFrom(parameters)
             }.fix()
 
     private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
@@ -61,18 +64,23 @@ class XnfSimulator(
                     .toEither()
                     .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
 
-    private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+    private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+            .toFlux()
+            .map(::simulate)
+            .then(Mono.just(Unit))
+            .asIo()
+
+    private fun simulate(parameters: MessageParameters): Mono<Unit> =
             when (parameters) {
-                is VesEventParameters -> generatorFactory
-                        .createVesEventGenerator()
-                        .createMessageFlux(parameters)
-                        .map(::encodeToWireFrame)
-                is WireFrameParameters -> generatorFactory
-                        .createWireFrameGenerator()
-                        .createMessageFlux(parameters)
-                else -> throw IllegalStateException("Invalid parameters type")
+                is VesEventParameters -> {
+                    val messages = vesEventGenerator.createMessageFlux(parameters)
+                    val client = clientFactory.create()
+                    client.sendVesEvents(messages)
+                }
+                is WireFrameParameters -> {
+                    val messages = wireFrameGenerator.createMessageFlux(parameters)
+                    val client = clientFactory.create(parameters.wireFrameVersion)
+                    client.sendRawPayload(messages)
+                }
             }
-
-    private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
-            WireFrameMessage(event.toByteArray())
 }
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
new file mode 100644 (file)
index 0000000..afc157c
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HvVesClient(private val producer: HvVesProducer) {
+
+    fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
+            producer.send(messages)
+                    .then { logger.info { "Ves Events have been sent" } }
+
+
+    fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
+            producer.sendRaw(messages, PayloadType.UNDEFINED)
+                    .then { logger.info { "Raw messages have been sent" } }
+
+    companion object {
+        private val logger = Logger(HvVesClient::class)
+    }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
deleted file mode 100644 (file)
index eba8ed8..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpClient
-import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-class VesHvClient(private val configuration: SimulatorConfiguration) {
-
-    private val client: TcpClient = TcpClient.create()
-            .addressSupplier { configuration.hvVesAddress }
-            .configureSsl()
-
-    private fun TcpClient.configureSsl() =
-            createSslContext(configuration.security)
-                    .map { sslContext -> this.secure(sslContext) }
-                    .getOrElse { this }
-
-    fun sendIo(messages: Flux<WireFrameMessage>) =
-            sendRx(messages).then(Mono.just(Unit)).asIo()
-
-    private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
-        val complete = ReplayProcessor.create<Void>(1)
-        client
-                .handle { _, output -> handler(complete, messages, output) }
-                .connect()
-                .doOnError {
-                    logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
-                }
-                .subscribe {
-                    logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
-                }
-        return complete.then()
-    }
-
-    private fun handler(complete: ReplayProcessor<Void>,
-                        messages: Flux<WireFrameMessage>,
-                        nettyOutbound: NettyOutbound): Publisher<Void> {
-
-        val allocator = nettyOutbound.alloc()
-        val encoder = WireFrameEncoder(allocator)
-        val frames = messages
-                .map(encoder::encode)
-                .window(XS_BUFFER_SIZE)
-
-        return nettyOutbound
-                .logConnectionClosed()
-                .options { it.flushOnBoundary() }
-                .sendGroups(frames)
-                .then {
-                    logger.info { "Messages have been sent" }
-                    complete.onComplete()
-                }
-                .then()
-    }
-
-    private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
-            SslContextFactory().createClientContext(config)
-
-    private fun NettyOutbound.logConnectionClosed() =
-            withConnection { conn ->
-                conn.onDispose {
-                    logger.info { "Connection to ${conn.address()} has been closed" }
-                }
-            }
-
-    companion object {
-        private val logger = Logger(VesHvClient::class)
-    }
-}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
new file mode 100644 (file)
index 0000000..1db66f1
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import io.vavr.collection.Set
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
+                               val security: SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
new file mode 100644 (file)
index 0000000..a91fccd
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class ClientFactory(configuration: ClientConfiguration) {
+
+    private val partialConfig = ImmutableProducerOptions
+            .builder()
+            .collectorAddresses(configuration.collectorAddresses)
+            .let { producerOptions ->
+                configuration.security.keys.fold(
+                        { producerOptions },
+                        { producerOptions.securityKeys(it) })
+            }
+
+    fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
+            buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
+
+
+    fun create(): HvVesClient = buildClient(partialConfig)
+
+    private fun buildClient(config: ImmutableProducerOptions.Builder) =
+            HvVesClient(HvVesProducerFactory.create(config.build()))
+}
index ef62730..366c7e6 100644 (file)
@@ -23,15 +23,17 @@ import arrow.effects.IO
 import arrow.effects.fix
 import arrow.effects.instances.io.monad.monad
 import arrow.typeclasses.binding
+import io.vavr.collection.HashSet
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
@@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
         IO.monad().binding {
             logger.info { "Using configuration: $config" }
             XnfHealthCheckServer().startServer(config).bind()
+            val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
             val xnfSimulator = XnfSimulator(
-                    VesHvClient(config),
+                    ClientFactory(clientConfig),
                     MessageGeneratorFactory(config.maxPayloadSizeBytes)
             )
             XnfApiServer(xnfSimulator, OngoingSimulations())
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
new file mode 100644 (file)
index 0000000..daf3061
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.ves.VesEventOuterClass
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+internal class HvVesClientTest : Spek({
+    describe("HvVesClient") {
+        val hvVesProducer: HvVesProducer = mock()
+        val cut = HvVesClient(hvVesProducer)
+
+        describe("handling ves events stream") {
+
+            val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
+            whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
+            cut.sendVesEvents(vesEvents)
+
+            it("should perform sending operation") {
+                verify(hvVesProducer).send(vesEvents)
+            }
+        }
+
+        describe("handling raw message stream") {
+
+            val rawMessages = Flux.empty<ByteBuffer>()
+            whenever(hvVesProducer.sendRaw(any(), any())).thenReturn(Mono.empty())
+            cut.sendRawPayload(rawMessages)
+
+            it("should perform sending operation") {
+                verify(hvVesProducer).sendRaw(eq(rawMessages), any())
+            }
+        }
+    }
+})
\ No newline at end of file
index 192725b..123f12a 100644 (file)
@@ -21,18 +21,28 @@ package org.onap.dcae.collectors.veshv.main
 
 import arrow.core.Left
 import arrow.core.None
+import arrow.core.Right
 import com.nhaarman.mockitokotlin2.any
 import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
 import com.nhaarman.mockitokotlin2.whenever
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
 import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.io.ByteArrayInputStream
 
 /**
@@ -41,15 +51,15 @@ import java.io.ByteArrayInputStream
  */
 internal class XnfSimulatorTest : Spek({
     lateinit var cut: XnfSimulator
-    lateinit var vesClient: VesHvClient
+    lateinit var clientFactory: ClientFactory
     lateinit var messageParametersParser: MessageParametersParser
     lateinit var generatorFactory: MessageGeneratorFactory
 
     beforeEachTest {
-        vesClient = mock()
+        clientFactory = mock()
         messageParametersParser = mock()
         generatorFactory = mock()
-        cut = XnfSimulator(vesClient, generatorFactory, messageParametersParser)
+        cut = XnfSimulator(clientFactory, generatorFactory, messageParametersParser)
     }
 
     describe("startSimulation") {
@@ -89,22 +99,34 @@ internal class XnfSimulatorTest : Spek({
             assertThat(result).left().isEqualTo(cause)
         }
 
-        // TODO uncomment and fix this test after introducing HvVesProducer from onap SDK in XnfSimulator
-//        it("should return generated messages") {
-//            // given
-//            val json = "[true]".byteInputStream()
-//            val messageParams = listOf<MessageParameters>()
-//            val generatedMessages = Flux.empty<WireFrameMessage>()
-//            val sendingIo = IO {}
-//            whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
-//            whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
-//            whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
-//
-//            // when
-//            val result = cut.startSimulation(json)
-//
-//            // then
-//            assertThat(result).right().isSameAs(sendingIo)
-//        }
+        it("should return generated ves messages") {
+            // given
+            val vesEventGenerator: VesEventGenerator = mock()
+            val vesClient: HvVesClient = mock()
+
+            val json = "[true]".byteInputStream()
+
+            val vesEventParams = VesEventParameters(
+                    CommonEventHeader.getDefaultInstance(),
+                    VesEventType.VALID,
+                    1
+            )
+            val messageParams = listOf(vesEventParams)
+
+            val generatedMessages = Flux.empty<VesEventOuterClass.VesEvent>()
+
+
+            whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+            whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
+            whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
+            whenever(clientFactory.create()).thenReturn(vesClient)
+            whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+            // when
+            cut.startSimulation(json).map { it.unsafeRunSync() }
+
+            // then
+            verify(vesClient).sendVesEvents(generatedMessages)
+        }
     }
 })
\ No newline at end of file