1 package org.onap.dcae.collectors.veshv.impl.adapters
 
   3 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 
   4 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 
   5 import org.onap.ves.VesEventV5
 
   6 import org.slf4j.LoggerFactory
 
   7 import reactor.core.publisher.Flux
 
   8 import java.io.StringReader
 
   9 import java.time.Duration
 
  11 import java.util.concurrent.atomic.AtomicReference
 
  12 import javax.json.Json
 
  13 import javax.json.JsonObject
 
  17  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 
  20 internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
 
  21     : ConfigurationProvider {
 
  24     private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
 
  25     private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
 
  27     override fun invoke(): Flux<CollectorConfiguration> =
 
  28             Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
 
  29                     .flatMap { http.getResponse(url) }
 
  30                     .filter { body -> body.hashCode() != lastConfigurationHash.get() }
 
  31                     .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
 
  32                     .map { str -> getConfigurationJson(str) }
 
  33                     .map { json -> createCollectorConfiguration(json) }
 
  35     private fun getConfigurationJson(str: String): JsonObject {
 
  36         val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
 
  37         val decodedValue = String(
 
  38                 Base64.getDecoder().decode(response.getString("Value")))
 
  39         logger.info("Obtained new configuration from consul:\n$decodedValue")
 
  40         return Json.createReader(StringReader(decodedValue)).readObject()
 
  43     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
 
  45         val routing = configuration.getJsonObject("routing")
 
  47         return CollectorConfiguration(
 
  48                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
 
  49                 routing = org.onap.dcae.collectors.veshv.model.routing {
 
  51                         fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
 
  52                         toTopic(routing.getString("toTopic"))
 
  53                         withFixedPartitioning()
 
  60         private const val REFRESH_INTERVAL_MINUTES: Long = 5
 
  61         private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)