39b4fe2f9280431679806c1a967b14a9778f46c1
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
21
22 import arrow.core.Try
23 import arrow.core.getOrElse
24 import arrow.effects.IO
25 import com.google.protobuf.MessageOrBuilder
26 import com.google.protobuf.util.JsonFormat
27 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
28 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
31 import org.onap.ves.VesEventV5.VesEvent
32 import ratpack.handling.Chain
33 import ratpack.server.RatpackServer
34 import ratpack.server.ServerConfig
35
36 /**
37  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
38  * @since May 2018
39  */
40 class ApiServer(private val consumerFactory: ConsumerFactory) {
41
42     private lateinit var consumerState: ConsumerStateProvider
43     private val jsonPrinter = JsonFormat.printer()
44
45     fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
46         consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
47         RatpackServer.start { server ->
48             server.serverConfig(ServerConfig.embedded().port(port))
49                     .handlers(this::setupHandlers)
50         }
51     }
52
53     private fun setupHandlers(chain: Chain) {
54         chain
55                 .put("configuration/topics") { ctx ->
56                     ctx.request.body.then { it ->
57                         val topics = extractTopics(it.getText())
58                         logger.info("Received new configuration. Creating consumer for topics: $topics")
59                         consumerState = consumerFactory.createConsumerForTopics(topics)
60                         ctx.response.contentType(CONTENT_TEXT)
61                         ctx.response.send("OK")
62                     }
63
64                 }
65
66                 .get("messages/count") { ctx ->
67                     ctx.response.contentType(CONTENT_TEXT)
68                     val state = consumerState.currentState()
69                     ctx.response.send(state.msgCount.toString())
70                 }
71
72                 .get("messages/last/key") { ctx ->
73                     ctx.response.contentType(CONTENT_JSON)
74                     val state = consumerState.currentState()
75                     val resp = state.lastKey
76                             .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
77                             .map(this::protobufToJson)
78                             .getOrElse { "null" }
79                     ctx.response.send(resp)
80                 }
81
82                 .get("messages/last/value") { ctx ->
83                     ctx.response.contentType(CONTENT_JSON)
84                     val state = consumerState.currentState()
85                     val resp = state.lastValue
86                             .map { Try { VesEvent.parseFrom(it) } }
87                             .map(this::protobufToJson)
88                             .getOrElse { "null" }
89                     ctx.response.send(resp)
90                 }
91
92                 .get("messages/last/hvRanMeasFields") { ctx ->
93                     ctx.response.contentType(CONTENT_JSON)
94                     val state = consumerState.currentState()
95                     val resp = state.lastValue
96                             .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
97                             .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
98                             .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
99                             .map(this::protobufToJson)
100                             .getOrElse { "null" }
101                     ctx.response.send(resp)
102                 }
103
104                 .delete("messages") { ctx ->
105                     ctx.response.contentType(CONTENT_TEXT)
106                     consumerState.reset()
107                             .unsafeRunAsync {
108                                 it.fold(
109                                         { ctx.response.send("NOK") },
110                                         { ctx.response.send("OK") }
111                                 )
112                             }
113                 }
114     }
115
116     private fun extractTopics(it: String): Set<String> =
117             it.substringAfter("=")
118                     .split(",")
119                     .toSet()
120
121     private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
122             parseResult.fold(
123                     { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
124                     { jsonPrinter.print(it) })
125
126
127     companion object {
128         private val logger = Logger(ApiServer::class)
129
130         private const val CONTENT_TEXT = "text/plain"
131         private const val CONTENT_JSON = "application/json"
132     }
133 }