6c830b9d5cc63b11b4d30c53d2b46d0757d89e1f
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
21
22 import arrow.core.Left
23 import arrow.core.Right
24 import arrow.effects.IO
25 import arrow.effects.fix
26 import arrow.effects.monad
27 import arrow.typeclasses.binding
28 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import ratpack.exec.Promise
31 import ratpack.handling.Chain
32 import ratpack.handling.Context
33 import ratpack.http.Response
34 import ratpack.server.RatpackServer
35 import ratpack.server.ServerConfig
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since May 2018
40  */
41 class ApiServer(private val simulator: DcaeAppSimulator) {
42
43
44     fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
45             simulator.listenToTopics(kafkaTopics).map {
46                 RatpackServer.start { server ->
47                     server.serverConfig(ServerConfig.embedded().port(port))
48                             .handlers(::setupHandlers)
49                 }
50             }
51
52     private fun setupHandlers(chain: Chain) {
53         chain
54                 .put("configuration/topics") { ctx ->
55                     val operation = ctx.bodyIo().flatMap { body ->
56                         simulator.listenToTopics(body.text)
57                     }
58                     ctx.response.sendOrError(operation)
59
60                 }
61                 .delete("messages") { ctx ->
62                     ctx.response.contentType(CONTENT_TEXT)
63                     ctx.response.sendOrError(simulator.resetState())
64                 }
65                 .get("messages/all/count") { ctx ->
66                     simulator.state().fold(
67                             { ctx.response.status(STATUS_NOT_FOUND) },
68                             {
69                                 ctx.response
70                                         .contentType(CONTENT_TEXT)
71                                         .send(it.messagesCount.toString())
72                             })
73                 }
74                 .post("messages/all/validate") { ctx ->
75                     val responseStatus = IO.monad().binding {
76                         val body = ctx.bodyIo().bind()
77                         val isValid = simulator.validate(body.inputStream).bind()
78                         if (isValid)
79                             STATUS_OK
80                         else
81                             STATUS_BAD_REQUEST
82                     }.fix()
83
84                     ctx.response.sendStatusOrError(responseStatus)
85                 }
86                 .get("healthcheck") { ctx ->
87                     ctx.response.status(STATUS_OK).send()
88                 }
89     }
90
91     private fun Context.bodyIo() = request.body.asIo()
92
93     private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
94         onError {
95             emitResult(Left(it))
96         }.then { result ->
97             emitResult(Right(result))
98         }
99     }
100
101     private fun Response.sendOrError(responseStatus: IO<Unit>) {
102         sendStatusOrError(responseStatus.map { STATUS_OK })
103     }
104
105     private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
106         responseStatus.unsafeRunAsync { cb ->
107             cb.fold(
108                     { err ->
109                         logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
110                         status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
111                                 .send(CONTENT_TEXT, err.message)
112                     },
113                     {
114                         status(it).send()
115                     }
116             )
117         }
118     }
119
120     companion object {
121         private val logger = Logger(ApiServer::class)
122         private const val CONTENT_TEXT = "text/plain"
123
124         private const val STATUS_OK = 200
125         private const val STATUS_BAD_REQUEST = 400
126         private const val STATUS_NOT_FOUND = 404
127         private const val STATUS_INTERNAL_SERVER_ERROR = 500
128     }
129 }