2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.xnf.impl
22 import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
23 import org.onap.dcae.collectors.veshv.utils.logging.Logger
24 import ratpack.handling.Chain
25 import ratpack.handling.Context
26 import ratpack.server.RatpackServer
27 import ratpack.server.ServerConfig
28 import reactor.core.publisher.Mono
29 import javax.json.Json
30 import javax.json.JsonObject
33 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
36 class HttpServer(private val vesClient: VesHvClient) {
38 fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
40 it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
42 }.doOnNext { it.start() }
45 private fun configureHandlers(chain: Chain) {
46 chain.post("simulator") { ctx ->
48 .map { Json.createReader(it.inputStream).readObject() }
49 .map { extractMessageParameters(it) }
50 .map { MessageFactory.createMessageFlux(it) }
51 .onError { handleException(it, ctx) }
56 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
57 .add("response", "Request accepted")
64 private fun handleException(t: Throwable, ctx: Context) {
65 logger.warn("Failed to process the request - ${t.localizedMessage}")
66 logger.debug("Exception thrown when processing the request", t)
68 .status(STATUS_BAD_REQUEST)
69 .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
70 .add("response", "Request was not accepted")
71 .add("exception", t.localizedMessage)
76 private fun extractMessageParameters(request: JsonObject): MessageParameters =
78 val commonEventHeader = MessageFactory
79 .parseCommonHeader(request.getJsonObject("commonEventHeader"))
80 val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
81 MessageParameters(commonEventHeader, messagesAmount)
82 } catch (e: Exception) {
83 throw ValidationException("Validating request body failed", e)
88 private val logger = Logger(HttpServer::class)
89 const val DEFAULT_PORT = 5000
90 const val STATUS_OK = 200
91 const val STATUS_BAD_REQUEST = 400
92 const val CONTENT_TYPE_APPLICATION_JSON = "application/json"
96 internal class ValidationException(message: String?, cause: Exception) : Exception(message, cause)