fcb8e1318b9048f1bbc27cb9bd95baed00d4cfdf
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
21
22 import com.google.protobuf.util.JsonFormat
23 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
24 import org.onap.ves.VesEventV5.VesEvent
25 import ratpack.handling.Chain
26 import ratpack.server.RatpackServer
27 import ratpack.server.ServerConfig
28 import reactor.core.publisher.Mono
29
30 /**
31  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
32  * @since May 2018
33  */
34 class ApiServer(private val consumerState: ConsumerStateProvider) {
35     private val jsonPrinter = JsonFormat.printer()
36
37     fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
38         RatpackServer.of { server ->
39             server.serverConfig(ServerConfig.embedded().port(port))
40                     .handlers(this::setupHandlers)
41         }
42     }.doOnNext(RatpackServer::start)
43
44     private fun setupHandlers(chain: Chain) {
45         chain
46                 .get("messages/count") { ctx ->
47                     ctx.response.contentType(CONTENT_TEXT)
48                     consumerState.currentState()
49                             .map { it.msgCount.toString() }
50                             .subscribe(ctx.response::send)
51                 }
52
53                 .get("messages/last/key") { ctx ->
54                     ctx.response.contentType(CONTENT_JSON)
55                     consumerState.currentState()
56                             .map { it.lastKey }
57                             .map { VesEvent.CommonEventHeader.parseFrom(it) }
58                             .map { jsonPrinter.print(it) }
59                             .subscribe(ctx.response::send)
60                 }
61
62                 .get("messages/last/value") { ctx ->
63                     ctx.response.contentType(CONTENT_JSON)
64                     consumerState.currentState()
65                             .map { it.lastValue }
66                             .map { VesEvent.parseFrom(it) }
67                             .map { jsonPrinter.print(it) }
68                             .subscribe(ctx.response::send)
69                 }
70     }
71
72     companion object {
73         private const val CONTENT_TEXT = "text/plain"
74         private const val CONTENT_JSON = "application/json"
75     }
76 }