2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
25 import org.onap.dcae.collectors.veshv.utils.logging.Logger
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.retry.Jitter
31 import reactor.retry.Retry
32 import java.io.StringReader
33 import java.time.Duration
35 import java.util.concurrent.atomic.AtomicReference
36 import javax.json.Json
37 import javax.json.JsonObject
41 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
44 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
45 private val url: String,
46 private val firstRequestDelay: Duration,
47 private val requestInterval: Duration,
49 ) : ConfigurationProvider {
51 private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
52 private val retry = retrySpec
54 logger.warn("Could not get fresh configuration", it.exception())
57 constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
59 params.configurationUrl,
60 params.firstRequestDelay,
61 params.requestInterval,
63 .retryMax(MAX_RETRIES)
64 .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
65 .jitter(Jitter.random()))
67 override fun invoke(): Flux<CollectorConfiguration> =
68 Flux.interval(firstRequestDelay, requestInterval)
69 .flatMap { askForConfig() }
70 .map(::parseJsonResponse)
71 .map(::extractEncodedConfiguration)
72 .flatMap(::filterDifferentValues)
73 .map(::decodeConfiguration)
74 .map(::createCollectorConfiguration)
77 private fun askForConfig(): Mono<String> = http.get(url)
79 private fun parseJsonResponse(responseString: String): JsonObject =
80 Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
82 private fun extractEncodedConfiguration(response: JsonObject): String =
83 response.getString("Value")
85 private fun filterDifferentValues(base64Value: String): Mono<String> {
86 val newHash = hashOf(base64Value)
87 return if (newHash == lastConfigurationHash.get()) {
90 lastConfigurationHash.set(newHash)
91 Mono.just(base64Value)
95 private fun hashOf(str: String) = str.hashCode()
97 private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
98 val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
99 logger.info("Obtained new configuration from consul:\n$decodedValue")
100 return Json.createReader(StringReader(decodedValue)).readObject()
103 private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
104 val routing = configuration.getJsonArray("routing")
106 return CollectorConfiguration(
107 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
108 routing = org.onap.dcae.collectors.veshv.model.routing {
109 for (route in routing) {
110 val routeObj = route.asJsonObject()
112 fromDomain(forNumber(routeObj.getInt("fromDomain")))
113 toTopic(routeObj.getString("toTopic"))
114 withFixedPartitioning()
122 private const val MAX_RETRIES = 5
123 private const val BACKOFF_INTERVAL_FACTOR = 30L
124 private val logger = Logger(ConsulConfigurationProvider::class)