2fa8abeceb53f7d97aef12d0c5952196b4d3befb
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
21
22 import arrow.core.Try
23 import arrow.core.getOrElse
24 import arrow.effects.IO
25 import com.google.protobuf.MessageOrBuilder
26 import com.google.protobuf.util.JsonFormat
27 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
28 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
29 import org.onap.ves.VesEventV5.VesEvent
30 import ratpack.handling.Chain
31 import ratpack.server.RatpackServer
32 import ratpack.server.ServerConfig
33
34 /**
35  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
36  * @since May 2018
37  */
38 class ApiServer(private val consumerState: ConsumerStateProvider) {
39     private val jsonPrinter = JsonFormat.printer()
40
41     fun start(port: Int): IO<RatpackServer> = IO {
42         RatpackServer.start { server ->
43             server.serverConfig(ServerConfig.embedded().port(port))
44                     .handlers(this::setupHandlers)
45         }
46     }
47
48     private fun setupHandlers(chain: Chain) {
49         chain
50                 .get("messages/count") { ctx ->
51                     ctx.response.contentType(CONTENT_TEXT)
52                     val state = consumerState.currentState()
53                     ctx.response.send(state.msgCount.toString())
54                 }
55
56                 .get("messages/last/key") { ctx ->
57                     ctx.response.contentType(CONTENT_JSON)
58                     val state = consumerState.currentState()
59                     val resp = state.lastKey
60                             .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
61                             .map(this::protobufToJson)
62                             .getOrElse { "null" }
63                     ctx.response.send(resp)
64                 }
65
66                 .get("messages/last/value") { ctx ->
67                     ctx.response.contentType(CONTENT_JSON)
68                     val state = consumerState.currentState()
69                     val resp = state.lastValue
70                             .map { Try { VesEvent.parseFrom(it) } }
71                             .map(this::protobufToJson)
72                             .getOrElse { "null" }
73                     ctx.response.send(resp)
74                 }
75
76                 .get("messages/last/hvRanMeasFields") { ctx ->
77                     ctx.response.contentType(CONTENT_JSON)
78                     val state = consumerState.currentState()
79                     val resp = state.lastValue
80                             .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
81                             .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
82                             .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
83                             .map(this::protobufToJson)
84                             .getOrElse { "null" }
85                     ctx.response.send(resp)
86                 }
87
88                 .delete("messages") { ctx ->
89                     ctx.response.contentType(CONTENT_TEXT)
90                     consumerState.reset()
91                             .unsafeRunAsync {
92                                 it.fold(
93                                         { ctx.response.send("NOK") },
94                                         { ctx.response.send("OK") }
95                                 )
96                             }
97                 }
98     }
99
100     private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
101             parseResult.fold(
102                     { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
103                     { jsonPrinter.print(it) })
104
105
106     companion object {
107         private const val CONTENT_TEXT = "text/plain"
108         private const val CONTENT_JSON = "application/json"
109     }
110 }