263e7d44a1f1504b8da9c1be149fb165af42f39e
[dcaegen2/collectors/hv-ves.git] /
1 package org.onap.dcae.collectors.veshv.impl.adapters
2
3 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
4 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
5 import org.onap.ves.VesEventV5
6 import org.slf4j.LoggerFactory
7 import reactor.core.publisher.Flux
8 import java.io.StringReader
9 import java.time.Duration
10 import java.util.*
11 import java.util.concurrent.atomic.AtomicReference
12 import javax.json.Json
13 import javax.json.JsonObject
14
15
16 /**
17  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
18  * @since May 2018
19  */
20 internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
21     : ConfigurationProvider {
22
23
24     private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
25     private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
26
27     override fun invoke(): Flux<CollectorConfiguration> =
28             Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
29                     .flatMap { http.getResponse(url) }
30                     .filter { body -> body.hashCode() != lastConfigurationHash.get() }
31                     .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
32                     .map { str -> getConfigurationJson(str) }
33                     .map { json -> createCollectorConfiguration(json) }
34
35     private fun getConfigurationJson(str: String): JsonObject {
36         val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
37         val decodedValue = String(
38                 Base64.getDecoder().decode(response.getString("Value")))
39         logger.info("Obtained new configuration from consul:\n$decodedValue")
40         return Json.createReader(StringReader(decodedValue)).readObject()
41     }
42
43     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
44
45         val routing = configuration.getJsonObject("routing")
46
47         return CollectorConfiguration(
48                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
49                 routing = org.onap.dcae.collectors.veshv.model.routing {
50                     defineRoute {
51                         fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
52                         toTopic(routing.getString("toTopic"))
53                         withFixedPartitioning()
54                     }
55                 }.build()
56         )
57     }
58
59     companion object {
60         private const val REFRESH_INTERVAL_MINUTES: Long = 5
61         private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
62     }
63 }