1 package org.onap.dcae.collectors.veshv.impl.adapters
3 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
4 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
5 import org.onap.ves.VesEventV5
6 import org.slf4j.LoggerFactory
7 import reactor.core.publisher.Flux
8 import java.io.StringReader
9 import java.time.Duration
11 import java.util.concurrent.atomic.AtomicReference
12 import javax.json.Json
13 import javax.json.JsonObject
17 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
20 internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
21 : ConfigurationProvider {
24 private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
25 private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
27 override fun invoke(): Flux<CollectorConfiguration> =
28 Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
29 .flatMap { http.getResponse(url) }
30 .filter { body -> body.hashCode() != lastConfigurationHash.get() }
31 .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
32 .map { str -> getConfigurationJson(str) }
33 .map { json -> createCollectorConfiguration(json) }
35 private fun getConfigurationJson(str: String): JsonObject {
36 val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
37 val decodedValue = String(
38 Base64.getDecoder().decode(response.getString("Value")))
39 logger.info("Obtained new configuration from consul:\n$decodedValue")
40 return Json.createReader(StringReader(decodedValue)).readObject()
43 private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
45 val routing = configuration.getJsonObject("routing")
47 return CollectorConfiguration(
48 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
49 routing = org.onap.dcae.collectors.veshv.model.routing {
51 fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
52 toTopic(routing.getString("toTopic"))
53 withFixedPartitioning()
60 private const val REFRESH_INTERVAL_MINUTES: Long = 5
61 private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)