import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
* @since August 2018
*/
class XnfSimulator(
- private val vesClient: VesHvClient,
+ private val clientFactory: ClientFactory,
private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+ private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+ private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
-
val json = parseJsonArray(messageParameters).bind()
- messageParametersParser.parse(json).bind()
- .toFlux()
- .flatMap(::generateMessages)
- .let { vesClient.sendIo(it) }
+ val parameters = messageParametersParser.parse(json).bind()
+ simulationFrom(parameters)
}.fix()
private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun simulate(parameters: MessageParameters): Mono<Unit> =
when (parameters) {
- is VesEventParameters -> generatorFactory
- .createVesEventGenerator()
- .createMessageFlux(parameters)
- .map(::encodeToWireFrame)
- is WireFrameParameters -> generatorFactory
- .createWireFrameGenerator()
- .createMessageFlux(parameters)
- else -> throw IllegalStateException("Invalid parameters type")
+ is VesEventParameters -> {
+ val messages = vesEventGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create()
+ client.sendVesEvents(messages)
+ }
+ is WireFrameParameters -> {
+ val messages = wireFrameGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create(parameters.wireFrameVersion)
+ client.sendRawPayload(messages)
+ }
}
-
- private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
- WireFrameMessage(event.toByteArray())
}