DCAE APP simulator rework
[dcaegen2/collectors/hv-ves.git] / hv-collector-dcae-app-simulator / src / main / kotlin / org / onap / dcae / collectors / veshv / simulators / dcaeapp / remote / ApiServer.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
21
22 import arrow.effects.IO
23 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
24 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
25 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
26 import org.onap.dcae.collectors.veshv.utils.logging.Logger
27 import org.onap.dcae.collectors.veshv.utils.messages.MessageParametersParser
28 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
29 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
30 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
31 import org.onap.ves.VesEventV5.VesEvent
32 import ratpack.handling.Chain
33 import ratpack.handling.Context
34 import ratpack.server.RatpackServer
35 import ratpack.server.ServerConfig
36 import reactor.core.publisher.Mono
37 import javax.json.Json
38
39 /**
40  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41  * @since May 2018
42  */
43 class ApiServer(private val consumerFactory: ConsumerFactory,
44                 private val messageParametersParser: MessageParametersParser = MessageParametersParser()) {
45
46     private lateinit var consumerState: ConsumerStateProvider
47
48     fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
49         consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
50         RatpackServer.start { server ->
51             server.serverConfig(ServerConfig.embedded().port(port))
52                     .handlers(this::setupHandlers)
53         }
54     }
55
56     private fun setupHandlers(chain: Chain) {
57         chain
58                 .put("configuration/topics") { ctx ->
59                     ctx.request.body.then { it ->
60                         val topics = extractTopics(it.text)
61                         logger.info("Received new configuration. Creating consumer for topics: $topics")
62                         consumerState = consumerFactory.createConsumerForTopics(topics)
63                         ctx.response
64                                 .status(STATUS_OK)
65                                 .send()
66                     }
67
68                 }
69                 .delete("messages") { ctx ->
70                     ctx.response.contentType(CONTENT_TEXT)
71                     consumerState.reset()
72                             .unsafeRunAsync {
73                                 it.fold(
74                                         { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
75                                         { ctx.response.status(STATUS_OK) }
76                                 ).send()
77                             }
78                 }
79                 .get("messages/all/count") { ctx ->
80                     val state = consumerState.currentState()
81                     ctx.response
82                             .contentType(CONTENT_TEXT)
83                             .send(state.messagesCount.toString())
84                 }
85                 .post("messages/all/validate") { ctx ->
86                     ctx.request.body
87                             .map { Json.createReader(it.inputStream).readArray() }
88                             .map { messageParametersParser.parse(it) }
89                             .map { generateEvents(ctx, it) }
90                             .then { (generatedEvents, shouldValidatePayloads) ->
91                                 generatedEvents
92                                         .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
93                                         .block()
94                             }
95                 }
96     }
97
98     private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
99             Pair<Mono<List<VesEvent>>, Boolean> = Pair(
100
101             doGenerateEvents(parameters).doOnError {
102                 logger.error("Error occurred when generating messages: $it")
103                 ctx.response
104                         .status(STATUS_INTERNAL_SERVER_ERROR)
105                         .send()
106             },
107             parameters.all { it.messageType == FIXED_PAYLOAD }
108     )
109
110     private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
111             .createMessageFlux(parameters)
112             .map(PayloadWireFrameMessage::payload)
113             .map { decode(it.unsafeAsArray()) }
114             .collectList()
115
116
117     private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
118
119
120     private fun sendResponse(ctx: Context,
121                              generatedEvents: List<VesEvent>,
122                              shouldValidatePayloads: Boolean) =
123             resolveResponseStatusCode(
124                     generated = generatedEvents,
125                     consumed = decodeConsumedEvents(),
126                     validatePayloads = shouldValidatePayloads
127             ).let { ctx.response.status(it).send() }
128
129
130     private fun decodeConsumedEvents(): List<VesEvent> = consumerState
131             .currentState()
132             .consumedMessages
133             .map(::decode)
134
135
136     private fun resolveResponseStatusCode(generated: List<VesEvent>,
137                                           consumed: List<VesEvent>,
138                                           validatePayloads: Boolean): Int =
139             if (validatePayloads) {
140                 if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
141             } else {
142                 validateHeaders(consumed, generated)
143             }
144
145     private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
146         val consumedHeaders = consumed.map { it.commonEventHeader }
147         val generatedHeaders = generated.map { it.commonEventHeader }
148         return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
149     }
150
151     private fun extractTopics(it: String): Set<String> =
152             it.substringAfter("=")
153                     .split(",")
154                     .toSet()
155
156     companion object {
157         private val logger = Logger(ApiServer::class)
158         private const val CONTENT_TEXT = "text/plain"
159
160         private const val STATUS_OK = 200
161         private const val STATUS_BAD_REQUEST = 400
162         private const val STATUS_INTERNAL_SERVER_ERROR = 500
163     }
164 }
165
166