2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
24 import org.onap.dcae.collectors.veshv.utils.logging.Logger
25 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
26 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
27 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
28 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
29 import ratpack.exec.Promise
30 import ratpack.handling.Chain
31 import ratpack.handling.Context
32 import ratpack.server.RatpackServer
33 import ratpack.server.ServerConfig
34 import reactor.core.publisher.Flux
35 import reactor.core.scheduler.Schedulers
36 import java.nio.charset.Charset
37 import javax.json.Json
38 import javax.json.JsonArray
41 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
44 internal class HttpServer(private val vesClient: XnfSimulator) {
46 fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
47 RatpackServer.start { server ->
48 server.serverConfig(ServerConfig.embedded().port(port))
49 .handlers(this::configureHandlers)
53 private fun configureHandlers(chain: Chain) {
55 .post("simulator/sync") { ctx ->
56 createMessageFlux(ctx)
57 .map { vesClient.sendIo(it) }
58 .map { it.unsafeRunSync() }
59 .onError { handleException(it, ctx) }
60 .then { sendAcceptedResponse(ctx) }
62 .post("simulator/async") { ctx ->
63 createMessageFlux(ctx)
64 .map { vesClient.sendRx(it) }
65 .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
66 .onError { handleException(it, ctx) }
67 .then { sendAcceptedResponse(ctx) }
71 private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
72 return ctx.request.body
73 .map { Json.createReader(it.inputStream).readArray() }
74 .map { extractMessageParameters(it) }
75 .map { MessageGenerator.INSTANCE.createMessageFlux(it) }
78 private fun extractMessageParameters(request: JsonArray): List<MessageParameters> =
81 .map { it.asJsonObject() }
84 val domain = Domain.valueOf(it.getString("domain"))
85 val messageType = MessageType.valueOf(it.getString("messageType"))
86 val messagesAmount = it.getJsonNumber("messagesAmount").longValue()
87 MessageParameters(domain, messageType, messagesAmount)
89 } catch (e: Exception) {
90 throw ValidationException("Validating request body failed", e)
93 private fun sendAcceptedResponse(ctx: Context) {
96 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
97 .add("response", "Request accepted")
102 private fun handleException(t: Throwable, ctx: Context) {
103 logger.warn("Failed to process the request - ${t.localizedMessage}")
104 logger.debug("Exception thrown when processing the request", t)
106 .status(STATUS_BAD_REQUEST)
107 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
108 .add("response", "Request was not accepted")
109 .add("exception", t.localizedMessage)
115 private val logger = Logger(HttpServer::class)
116 const val DEFAULT_PORT = 5000
117 const val STATUS_OK = 200
118 const val STATUS_BAD_REQUEST = 400
119 const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
123 internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)