Add API endpoint for DCAE App Sim reconfiguration 37/58737/1
authorfkrzywka <filip.krzywka@nokia.com>
Thu, 12 Jul 2018 12:46:58 +0000 (14:46 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 13:14:02 +0000 (15:14 +0200)
Closes ONAP-547

Change-Id: I26b7d5d61eb984d58600a612cd8d3e4dd7be05c5
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt

index 6616953..7db6920 100644 (file)
@@ -66,3 +66,9 @@ class Consumer : ConsumerStateProvider {
         private val logger = Logger(Consumer::class)
     }
 }
+
+class ConsumerFactory(val kafkaBootstrapServers: String) {
+    fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
+        return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
+    }
+}
index a8a4cf5..fb28bc2 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp
 
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.arrow.void
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.slf4j.LoggerFactory
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
 private val logger = Logger(PACKAGE_NAME)
@@ -49,8 +49,8 @@ fun main(args: Array<String>) =
                 )
 
 
-private fun startApp(config: DcaeAppSimConfiguration) =
-        KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
-                .start()
-                .map(::ApiServer)
-                .flatMap { it.start(config.apiPort).void() }
+private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
+    return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+            .start(config.apiPort, config.kafkaTopics)
+            .void()
+}
\ No newline at end of file
index 2fa8abe..39b4fe2 100644 (file)
@@ -24,7 +24,9 @@ import arrow.core.getOrElse
 import arrow.effects.IO
 import com.google.protobuf.MessageOrBuilder
 import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
 import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
 import org.onap.ves.VesEventV5.VesEvent
 import ratpack.handling.Chain
@@ -35,10 +37,13 @@ import ratpack.server.ServerConfig
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class ApiServer(private val consumerState: ConsumerStateProvider) {
+class ApiServer(private val consumerFactory: ConsumerFactory) {
+
+    private lateinit var consumerState: ConsumerStateProvider
     private val jsonPrinter = JsonFormat.printer()
 
-    fun start(port: Int): IO<RatpackServer> = IO {
+    fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
+        consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
         RatpackServer.start { server ->
             server.serverConfig(ServerConfig.embedded().port(port))
                     .handlers(this::setupHandlers)
@@ -47,6 +52,17 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
 
     private fun setupHandlers(chain: Chain) {
         chain
+                .put("configuration/topics") { ctx ->
+                    ctx.request.body.then { it ->
+                        val topics = extractTopics(it.getText())
+                        logger.info("Received new configuration. Creating consumer for topics: $topics")
+                        consumerState = consumerFactory.createConsumerForTopics(topics)
+                        ctx.response.contentType(CONTENT_TEXT)
+                        ctx.response.send("OK")
+                    }
+
+                }
+
                 .get("messages/count") { ctx ->
                     ctx.response.contentType(CONTENT_TEXT)
                     val state = consumerState.currentState()
@@ -97,6 +113,11 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
                 }
     }
 
+    private fun extractTopics(it: String): Set<String> =
+            it.substringAfter("=")
+                    .split(",")
+                    .toSet()
+
     private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
             parseResult.fold(
                     { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
@@ -104,6 +125,8 @@ class ApiServer(private val consumerState: ConsumerStateProvider) {
 
 
     companion object {
+        private val logger = Logger(ApiServer::class)
+
         private const val CONTENT_TEXT = "text/plain"
         private const val CONTENT_JSON = "application/json"
     }
index 1661aea..8418cd7 100644 (file)
@@ -27,7 +27,6 @@ import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.void
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger