2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
22 import arrow.core.Left
23 import arrow.core.Right
24 import arrow.effects.IO
25 import arrow.effects.fix
26 import arrow.effects.monad
27 import arrow.typeclasses.binding
28 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import ratpack.exec.Promise
31 import ratpack.handling.Chain
32 import ratpack.handling.Context
33 import ratpack.http.Response
34 import ratpack.server.RatpackServer
35 import ratpack.server.ServerConfig
38 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41 class ApiServer(private val simulator: DcaeAppSimulator) {
44 fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
45 simulator.listenToTopics(kafkaTopics).map {
46 RatpackServer.start { server ->
47 server.serverConfig(ServerConfig.embedded().port(port))
48 .handlers(::setupHandlers)
52 private fun setupHandlers(chain: Chain) {
54 .put("configuration/topics") { ctx ->
55 val operation = ctx.bodyIo().flatMap { body ->
56 simulator.listenToTopics(body.text)
58 ctx.response.sendOrError(operation)
61 .delete("messages") { ctx ->
62 ctx.response.contentType(CONTENT_TEXT)
63 ctx.response.sendOrError(simulator.resetState())
65 .get("messages/all/count") { ctx ->
66 simulator.state().fold(
67 { ctx.response.status(STATUS_NOT_FOUND) },
70 .contentType(CONTENT_TEXT)
71 .send(it.messagesCount.toString())
74 .post("messages/all/validate") { ctx ->
75 val responseStatus = IO.monad().binding {
76 val body = ctx.bodyIo().bind()
77 val isValid = simulator.validate(body.inputStream).bind()
84 ctx.response.sendStatusOrError(responseStatus)
86 .get("healthcheck") { ctx ->
87 ctx.response.status(STATUS_OK).send()
91 private fun Context.bodyIo() = request.body.asIo()
93 private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
97 emitResult(Right(result))
101 private fun Response.sendOrError(responseStatus: IO<Unit>) {
102 sendStatusOrError(responseStatus.map { STATUS_OK })
105 private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
106 responseStatus.unsafeRunAsync { cb ->
109 logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
110 status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
111 .send(CONTENT_TEXT, err.message)
121 private val logger = Logger(ApiServer::class)
122 private const val CONTENT_TEXT = "text/plain"
124 private const val STATUS_OK = 200
125 private const val STATUS_BAD_REQUEST = 400
126 private const val STATUS_NOT_FOUND = 404
127 private const val STATUS_INTERNAL_SERVER_ERROR = 500