2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
24 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
25 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
26 import org.onap.dcae.collectors.veshv.utils.logging.Logger
27 import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
28 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
29 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
30 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
31 import org.onap.ves.VesEventV5.VesEvent
32 import ratpack.handling.Chain
33 import ratpack.handling.Context
34 import ratpack.server.RatpackServer
35 import ratpack.server.ServerConfig
36 import reactor.core.publisher.Mono
37 import javax.json.Json
40 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
43 class ApiServer(private val consumerFactory: ConsumerFactory,
44 private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
46 private lateinit var consumerState: ConsumerStateProvider
48 fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
49 consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
50 RatpackServer.start { server ->
51 server.serverConfig(ServerConfig.embedded().port(port))
52 .handlers(this::setupHandlers)
56 private fun setupHandlers(chain: Chain) {
58 .put("configuration/topics") { ctx ->
59 ctx.request.body.then { it ->
60 val topics = extractTopics(it.text)
61 logger.info("Received new configuration. Creating consumer for topics: $topics")
62 consumerState = consumerFactory.createConsumerForTopics(topics)
69 .delete("messages") { ctx ->
70 ctx.response.contentType(CONTENT_TEXT)
74 { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
75 { ctx.response.status(STATUS_OK) }
79 .get("messages/all/count") { ctx ->
80 val state = consumerState.currentState()
82 .contentType(CONTENT_TEXT)
83 .send(state.messagesCount.toString())
85 .post("messages/all/validate") { ctx ->
87 .map { Json.createReader(it.inputStream).readArray() }
88 .map { messageParametersParser.parse(it) }
89 .map { generateEvents(ctx, it) }
90 .then { (generatedEvents, shouldValidatePayloads) ->
92 .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
98 private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
99 Pair<Mono<List<VesEvent>>, Boolean> = Pair(
101 doGenerateEvents(parameters).doOnError {
102 logger.error("Error occurred when generating messages: $it")
104 .status(STATUS_INTERNAL_SERVER_ERROR)
107 parameters.all { it.messageType == FIXED_PAYLOAD }
110 private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
111 .createMessageFlux(parameters)
112 .map(PayloadWireFrameMessage::payload)
113 .map { decode(it.unsafeAsArray()) }
117 private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
120 private fun sendResponse(ctx: Context,
121 generatedEvents: List<VesEvent>,
122 shouldValidatePayloads: Boolean) =
123 resolveResponseStatusCode(
124 generated = generatedEvents,
125 consumed = decodeConsumedEvents(),
126 validatePayloads = shouldValidatePayloads
127 ).let { ctx.response.status(it).send() }
130 private fun decodeConsumedEvents(): List<VesEvent> = consumerState
136 private fun resolveResponseStatusCode(generated: List<VesEvent>,
137 consumed: List<VesEvent>,
138 validatePayloads: Boolean): Int =
139 if (validatePayloads) {
140 if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
142 validateHeaders(consumed, generated)
145 private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
146 val consumedHeaders = consumed.map { it.commonEventHeader }
147 val generatedHeaders = generated.map { it.commonEventHeader }
148 return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
151 private fun extractTopics(it: String): Set<String> =
152 it.substringAfter("=")
157 private val logger = Logger(ApiServer::class)
158 private const val CONTENT_TEXT = "text/plain"
160 private const val STATUS_OK = 200
161 private const val STATUS_BAD_REQUEST = 400
162 private const val STATUS_INTERNAL_SERVER_ERROR = 500