)
}
-
fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<ServerHandle> =
- simulator.listenToTopics(kafkaTopics).map {
+ IO {
+ simulator.listenToTopics(kafkaTopics)
HttpServer.create()
.host(socketAddress.hostName)
.port(socketAddress.port)
.let { NettyServerHandle(it.bindNow()) }
}
-
private fun setRoutes(route: HttpServerRoutes) {
route
.put("/configuration/topics") { req, res ->
req
.receive().aggregate().asString()
.flatMap {
- val option = simulator.listenToTopics(it)
- res.sendOrError(option).then()
+ res.sendOrError{ simulator.listenToTopics(it) }
}
}
.delete("/messages") { _, res ->
logger.info { "Resetting simulator state" }
+
res
.header("Content-type", CONTENT_TEXT)
- .sendOrError(simulator.resetState())
+ .sendOrError { simulator.resetState() }
}
.get("/messages/all/count") { _, res ->
logger.info { "Processing request for count of received messages" }
.post("/messages/all/validate") { req, res ->
req
.receive().aggregate().asInputStream()
- .flatMap { body ->
+ .map {
logger.info { "Processing request for message validation" }
- val response =
- simulator.validate(body)
- .map(::resolveValidationResponse)
- res.sendAndHandleErrors(response).then()
+ simulator.validate(it)
+ .map(::resolveValidationResponse)
+ }
+ .flatMap {
+ res.sendAndHandleErrors(it)
}
}
.get("/healthcheck") { _, res ->