*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
import ratpack.handling.Chain
import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
import javax.json.Json
import javax.json.JsonObject
*/
class HttpServer(private val vesClient: VesHvClient) {
- fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
- RatpackServer.of {
- it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+ fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
}
- }.doOnNext { it.start() }
+ }
private fun configureHandlers(chain: Chain) {
- chain.post("simulator") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readObject() }
- .map { extractMessageParameters(it) }
- .map { MessageFactory.INSTANCE.createMessageFlux(it) }
- .onError { handleException(it, ctx) }
- .then {
- vesClient.send(it)
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
- }
+ chain
+ .post("simulator/sync") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendIo(it) }
+ .map { it.unsafeRunSync() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ .post("simulator/async") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendRx(it) }
+ .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ }
+
+ private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ return ctx.request.body
+ .map { Json.createReader(it.inputStream).readObject() }
+ .map { extractMessageParameters(it) }
+ .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+ }
+
+ private fun sendAcceptedResponse(ctx: Context) {
+ ctx.response
+ .status(STATUS_OK)
+ .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+ .add("response", "Request accepted")
+ .build()
+ .toString())
}
private fun handleException(t: Throwable, ctx: Context) {