Various improvements
[dcaegen2/collectors/hv-ves.git] / hv-collector-client-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / xnf / impl / HttpServer.kt
index bc7db86..3f872b5 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
 import ratpack.handling.Chain
 import ratpack.handling.Context
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
 import javax.json.Json
 import javax.json.JsonObject
 
@@ -35,30 +39,46 @@ import javax.json.JsonObject
  */
 class HttpServer(private val vesClient: VesHvClient) {
 
-    fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
-        RatpackServer.of {
-            it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+    fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
+        RatpackServer.start { server ->
+            server.serverConfig(ServerConfig.embedded().port(port))
+                    .handlers(this::configureHandlers)
         }
-    }.doOnNext { it.start() }
+    }
 
 
     private fun configureHandlers(chain: Chain) {
-        chain.post("simulator") { ctx ->
-            ctx.request.body
-                    .map { Json.createReader(it.inputStream).readObject() }
-                    .map { extractMessageParameters(it) }
-                    .map { MessageFactory.INSTANCE.createMessageFlux(it) }
-                    .onError { handleException(it, ctx) }
-                    .then {
-                        vesClient.send(it)
-                        ctx.response
-                                .status(STATUS_OK)
-                                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
-                                        .add("response", "Request accepted")
-                                        .build()
-                                        .toString())
-                    }
-        }
+        chain
+                .post("simulator/sync") { ctx ->
+                    createMessageFlux(ctx)
+                            .map { vesClient.sendIo(it) }
+                            .map { it.unsafeRunSync() }
+                            .onError { handleException(it, ctx) }
+                            .then { sendAcceptedResponse(ctx) }
+                }
+                .post("simulator/async") { ctx ->
+                    createMessageFlux(ctx)
+                            .map { vesClient.sendRx(it) }
+                            .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
+                            .onError { handleException(it, ctx) }
+                            .then { sendAcceptedResponse(ctx) }
+                }
+    }
+
+    private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+        return ctx.request.body
+                .map { Json.createReader(it.inputStream).readObject() }
+                .map { extractMessageParameters(it) }
+                .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+    }
+
+    private fun sendAcceptedResponse(ctx: Context) {
+        ctx.response
+                .status(STATUS_OK)
+                .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+                        .add("response", "Request accepted")
+                        .build()
+                        .toString())
     }
 
     private fun handleException(t: Throwable, ctx: Context) {