2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
24 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
25 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
26 import org.onap.dcae.collectors.veshv.utils.logging.Logger
27 import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
28 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
29 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
30 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
31 import org.onap.ves.VesEventV5.VesEvent
32 import ratpack.handling.Chain
33 import ratpack.handling.Context
34 import ratpack.server.RatpackServer
35 import ratpack.server.ServerConfig
36 import reactor.core.publisher.Mono
37 import javax.json.Json
40 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
43 class ApiServer(private val consumerFactory: ConsumerFactory,
44 private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
46 private lateinit var consumerState: ConsumerStateProvider
48 fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
49 consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
50 RatpackServer.start { server ->
51 server.serverConfig(ServerConfig.embedded().port(port))
52 .handlers(this::setupHandlers)
56 private fun setupHandlers(chain: Chain) {
58 .put("configuration/topics") { ctx ->
59 ctx.request.body.then { it ->
60 val topics = extractTopics(it.text)
61 logger.info("Received new configuration. Creating consumer for topics: $topics")
62 consumerState = consumerFactory.createConsumerForTopics(topics)
69 .delete("messages") { ctx ->
70 ctx.response.contentType(CONTENT_TEXT)
74 { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
75 { ctx.response.status(STATUS_OK) }
79 .get("messages/all/count") { ctx ->
80 val state = consumerState.currentState()
82 .contentType(CONTENT_TEXT)
83 .send(state.messagesCount.toString())
85 .post("messages/all/validate") { ctx ->
87 .map { Json.createReader(it.inputStream).readArray() }
88 .map { messageParametersParser.parse(it) }
89 .map { generateEvents(ctx, it) }
90 .then { (generatedEvents, shouldValidatePayloads) ->
92 .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
96 .get("healthcheck") { ctx ->
97 ctx.response.status(STATUS_OK).send()
101 private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
102 Pair<Mono<List<VesEvent>>, Boolean> = Pair(
104 doGenerateEvents(parameters).doOnError {
105 logger.error("Error occurred when generating messages: $it")
107 .status(STATUS_INTERNAL_SERVER_ERROR)
110 parameters.all { it.messageType == FIXED_PAYLOAD }
113 private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
114 .createMessageFlux(parameters)
115 .map(PayloadWireFrameMessage::payload)
116 .map { decode(it.unsafeAsArray()) }
120 private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
123 private fun sendResponse(ctx: Context,
124 generatedEvents: List<VesEvent>,
125 shouldValidatePayloads: Boolean) =
126 resolveResponseStatusCode(
127 generated = generatedEvents,
128 consumed = decodeConsumedEvents(),
129 validatePayloads = shouldValidatePayloads
130 ).let { ctx.response.status(it).send() }
133 private fun decodeConsumedEvents(): List<VesEvent> = consumerState
139 private fun resolveResponseStatusCode(generated: List<VesEvent>,
140 consumed: List<VesEvent>,
141 validatePayloads: Boolean): Int =
142 if (validatePayloads) {
143 if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
145 validateHeaders(consumed, generated)
148 private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
149 val consumedHeaders = consumed.map { it.commonEventHeader }
150 val generatedHeaders = generated.map { it.commonEventHeader }
151 return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
154 private fun extractTopics(it: String): Set<String> =
155 it.substringAfter("=")
160 private val logger = Logger(ApiServer::class)
161 private const val CONTENT_TEXT = "text/plain"
163 private const val STATUS_OK = 200
164 private const val STATUS_BAD_REQUEST = 400
165 private const val STATUS_INTERNAL_SERVER_ERROR = 500