2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
22 import com.google.protobuf.util.JsonFormat
23 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
24 import org.onap.ves.VesEventV5.VesEvent
25 import ratpack.handling.Chain
26 import ratpack.server.RatpackServer
27 import ratpack.server.ServerConfig
28 import reactor.core.publisher.Mono
31 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
34 class ApiServer(private val consumerState: ConsumerStateProvider) {
35 private val jsonPrinter = JsonFormat.printer()
37 fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
38 RatpackServer.of { server ->
39 server.serverConfig(ServerConfig.embedded().port(port))
40 .handlers(this::setupHandlers)
42 }.doOnNext(RatpackServer::start)
44 private fun setupHandlers(chain: Chain) {
46 .get("messages/count") { ctx ->
47 ctx.response.contentType(CONTENT_TEXT)
48 consumerState.currentState()
49 .map { it.msgCount.toString() }
50 .subscribe(ctx.response::send)
53 .get("messages/last/key") { ctx ->
54 ctx.response.contentType(CONTENT_JSON)
55 consumerState.currentState()
57 .map { VesEvent.CommonEventHeader.parseFrom(it) }
58 .map { jsonPrinter.print(it) }
59 .subscribe(ctx.response::send)
62 .get("messages/last/value") { ctx ->
63 ctx.response.contentType(CONTENT_JSON)
64 consumerState.currentState()
66 .map { VesEvent.parseFrom(it) }
67 .map { jsonPrinter.print(it) }
68 .subscribe(ctx.response::send)
73 private const val CONTENT_TEXT = "text/plain"
74 private const val CONTENT_JSON = "application/json"