c545ac8dd9945680521a90017eec441e1abbbf97
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
21
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.WireFrame
24 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
25 import org.onap.dcae.collectors.veshv.utils.logging.Logger
26 import ratpack.exec.Promise
27 import ratpack.handling.Chain
28 import ratpack.handling.Context
29 import ratpack.server.RatpackServer
30 import ratpack.server.ServerConfig
31 import reactor.core.publisher.Flux
32 import reactor.core.scheduler.Schedulers
33 import javax.json.Json
34 import javax.json.JsonObject
35
36 /**
37  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
38  * @since June 2018
39  */
40 internal class HttpServer(private val vesClient: VesHvClient) {
41
42     fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
43         RatpackServer.start { server ->
44             server.serverConfig(ServerConfig.embedded().port(port))
45                     .handlers(this::configureHandlers)
46         }
47     }
48
49
50     private fun configureHandlers(chain: Chain) {
51         chain
52                 .post("simulator/sync") { ctx ->
53                     createMessageFlux(ctx)
54                             .map { vesClient.sendIo(it) }
55                             .map { it.unsafeRunSync() }
56                             .onError { handleException(it, ctx) }
57                             .then { sendAcceptedResponse(ctx) }
58                 }
59                 .post("simulator/async") { ctx ->
60                     createMessageFlux(ctx)
61                             .map { vesClient.sendRx(it) }
62                             .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
63                             .onError { handleException(it, ctx) }
64                             .then { sendAcceptedResponse(ctx) }
65                 }
66     }
67
68     private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
69         return ctx.request.body
70                 .map { Json.createReader(it.inputStream).readObject() }
71                 .map { extractMessageParameters(it) }
72                 .map { MessageGeneratorImpl.INSTANCE.createMessageFlux(it) }
73     }
74
75     private fun sendAcceptedResponse(ctx: Context) {
76         ctx.response
77                 .status(STATUS_OK)
78                 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
79                         .add("response", "Request accepted")
80                         .build()
81                         .toString())
82     }
83
84     private fun handleException(t: Throwable, ctx: Context) {
85         logger.warn("Failed to process the request - ${t.localizedMessage}")
86         logger.debug("Exception thrown when processing the request", t)
87         ctx.response
88                 .status(STATUS_BAD_REQUEST)
89                 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
90                         .add("response", "Request was not accepted")
91                         .add("exception", t.localizedMessage)
92                         .build()
93                         .toString())
94     }
95
96     private fun extractMessageParameters(request: JsonObject): MessageParameters =
97             try {
98                 val commonEventHeader = MessageGeneratorImpl.INSTANCE
99                         .parseCommonHeader(request.getJsonObject("commonEventHeader"))
100                 val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
101                 MessageParameters(commonEventHeader, messagesAmount)
102             } catch (e: Exception) {
103                 throw ValidationException("Validating request body failed", e)
104             }
105
106
107     companion object {
108         private val logger = Logger(HttpServer::class)
109         const val DEFAULT_PORT = 5000
110         const val STATUS_OK = 200
111         const val STATUS_BAD_REQUEST = 400
112         const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
113     }
114 }
115
116 internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)