*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.http.*
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendOrError
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
import reactor.netty.http.server.HttpServer
)
}
- fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
- IO {
+ fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): Mono<ServerHandle> =
+ Mono.defer {
simulator.listenToTopics(kafkaTopics)
HttpServer.create()
.host(socketAddress.hostName)
.port(socketAddress.port)
.route(::setRoutes)
- .let { NettyServerHandle(it.bindNow()) }
+ .bind()
+ .map { NettyServerHandle(it) }
}
private fun setRoutes(route: HttpServerRoutes) {
req
.receive().aggregate().asString()
.flatMap {
- res.sendOrError{ simulator.listenToTopics(it) }
+ res.sendOrError { simulator.listenToTopics(it) }
}
}
.delete("/messages") { _, res ->