*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.void
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.slf4j.LoggerFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
)
-private fun startApp(config: DcaeAppSimConfiguration) =
- KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
- .start()
- .map(::ApiServer)
- .flatMap { it.start(config.apiPort).void() }
+private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
+ return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+ .start(config.apiPort, config.kafkaTopics)
+ .void()
+}
\ No newline at end of file
import arrow.effects.IO
import com.google.protobuf.MessageOrBuilder
import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val consumerState: ConsumerStateProvider) {
+class ApiServer(private val consumerFactory: ConsumerFactory) {
+
+ private lateinit var consumerState: ConsumerStateProvider
private val jsonPrinter = JsonFormat.printer()
- fun start(port: Int): IO<RatpackServer> = IO {
+ fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
+ consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
RatpackServer.start { server ->
server.serverConfig(ServerConfig.embedded().port(port))
.handlers(this::setupHandlers)
private fun setupHandlers(chain: Chain) {
chain
+ .put("configuration/topics") { ctx ->
+ ctx.request.body.then { it ->
+ val topics = extractTopics(it.getText())
+ logger.info("Received new configuration. Creating consumer for topics: $topics")
+ consumerState = consumerFactory.createConsumerForTopics(topics)
+ ctx.response.contentType(CONTENT_TEXT)
+ ctx.response.send("OK")
+ }
+
+ }
+
.get("messages/count") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
val state = consumerState.currentState()
}
}
+ private fun extractTopics(it: String): Set<String> =
+ it.substringAfter("=")
+ .split(",")
+ .toSet()
+
private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
parseResult.fold(
{ ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
companion object {
+ private val logger = Logger(ApiServer::class)
+
private const val CONTENT_TEXT = "text/plain"
private const val CONTENT_JSON = "application/json"
}