Use sdk/hvves-producer in hvves/xnf-simulator
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-xnf-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / xnf / impl / XnfSimulator.kt
index 4dfdb84..812afe1 100644 (file)
@@ -26,12 +26,15 @@ import arrow.core.fix
 import arrow.effects.IO
 import arrow.instances.either.monad.monad
 import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import reactor.core.publisher.toFlux
 import java.io.InputStream
 import javax.json.Json
@@ -42,18 +45,18 @@ import javax.json.JsonArray
  * @since August 2018
  */
 class XnfSimulator(
-        private val vesClient: VesHvClient,
+        private val clientFactory: ClientFactory,
         private val generatorFactory: MessageGeneratorFactory,
         private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
 
+    private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+    private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
     fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
             Either.monad<ParsingError>().binding {
-
                 val json = parseJsonArray(messageParameters).bind()
-                messageParametersParser.parse(json).bind()
-                        .toFlux()
-                        .flatMap(::generateMessages)
-                        .let { vesClient.sendIo(it) }
+                val parameters = messageParametersParser.parse(json).bind()
+                simulationFrom(parameters)
             }.fix()
 
     private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
@@ -61,18 +64,23 @@ class XnfSimulator(
                     .toEither()
                     .mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
 
-    private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+    private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+            .toFlux()
+            .map(::simulate)
+            .then(Mono.just(Unit))
+            .asIo()
+
+    private fun simulate(parameters: MessageParameters): Mono<Unit> =
             when (parameters) {
-                is VesEventParameters -> generatorFactory
-                        .createVesEventGenerator()
-                        .createMessageFlux(parameters)
-                        .map(::encodeToWireFrame)
-                is WireFrameParameters -> generatorFactory
-                        .createWireFrameGenerator()
-                        .createMessageFlux(parameters)
-                else -> throw IllegalStateException("Invalid parameters type")
+                is VesEventParameters -> {
+                    val messages = vesEventGenerator.createMessageFlux(parameters)
+                    val client = clientFactory.create()
+                    client.sendVesEvents(messages)
+                }
+                is WireFrameParameters -> {
+                    val messages = wireFrameGenerator.createMessageFlux(parameters)
+                    val client = clientFactory.create(parameters.wireFrameVersion)
+                    client.sendRawPayload(messages)
+                }
             }
-
-    private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
-            WireFrameMessage(event.toByteArray())
 }