0ab248b9c162790dae063a066ec031ed979c3b9a
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
21
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
24 import org.onap.dcae.collectors.veshv.utils.logging.Logger
25 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
26 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
27 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
28 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
29 import ratpack.exec.Promise
30 import ratpack.handling.Chain
31 import ratpack.handling.Context
32 import ratpack.server.RatpackServer
33 import ratpack.server.ServerConfig
34 import reactor.core.publisher.Flux
35 import reactor.core.scheduler.Schedulers
36 import java.nio.charset.Charset
37 import javax.json.Json
38 import javax.json.JsonArray
39
40 /**
41  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
42  * @since June 2018
43  */
44 internal class HttpServer(private val vesClient: XnfSimulator) {
45
46     fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
47         RatpackServer.start { server ->
48             server.serverConfig(ServerConfig.embedded().port(port))
49                     .handlers(this::configureHandlers)
50         }
51     }
52
53     private fun configureHandlers(chain: Chain) {
54         chain
55                 .post("simulator/sync") { ctx ->
56                     createMessageFlux(ctx)
57                             .map { vesClient.sendIo(it) }
58                             .map { it.unsafeRunSync() }
59                             .onError { handleException(it, ctx) }
60                             .then { sendAcceptedResponse(ctx) }
61                 }
62                 .post("simulator/async") { ctx ->
63                     createMessageFlux(ctx)
64                             .map { vesClient.sendRx(it) }
65                             .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
66                             .onError { handleException(it, ctx) }
67                             .then { sendAcceptedResponse(ctx) }
68                 }
69     }
70
71     private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
72         return ctx.request.body
73                 .map { Json.createReader(it.inputStream).readArray() }
74                 .map { extractMessageParameters(it) }
75                 .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
76     }
77
78     private fun extractMessageParameters(request: JsonArray): List<MessageParameters> =
79             try {
80                 request
81                         .map { it.asJsonObject() }
82                         .map {
83
84                             val domain = Domain.valueOf(it.getString("domain"))
85                             val messageType = MessageType.valueOf(it.getString("messageType"))
86                             val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
87                             MessageParameters(domain, messageType, messagesAmount)
88                         }
89             } catch (e: Exception) {
90                 throw ValidationException("Validating request body failed", e)
91             }
92
93     private fun sendAcceptedResponse(ctx: Context) {
94         ctx.response
95                 .status(STATUS_OK)
96                 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
97                         .add("response", "Request accepted")
98                         .build()
99                         .toString())
100     }
101
102     private fun handleException(t: Throwable, ctx: Context) {
103         logger.warn("Failed to process the request - ${t.localizedMessage}")
104         logger.debug("Exception thrown when processing the request", t)
105         ctx.response
106                 .status(STATUS_BAD_REQUEST)
107                 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
108                         .add("response", "Request was not accepted")
109                         .add("exception", t.localizedMessage)
110                         .build()
111                         .toString())
112     }
113
114     companion object {
115         private val logger = Logger(HttpServer::class)
116         const val DEFAULT_PORT = 5000
117         const val STATUS_OK = 200
118         const val STATUS_BAD_REQUEST = 400
119         const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
120     }
121 }
122
123 internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)