2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
27 import org.onap.dcae.collectors.veshv.model.ServiceContext
28 import org.onap.dcae.collectors.veshv.model.routing
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import org.onap.dcae.collectors.veshv.utils.logging.Marker
31 import reactor.core.publisher.Flux
32 import reactor.core.publisher.Mono
33 import reactor.retry.Jitter
34 import reactor.retry.Retry
35 import java.io.StringReader
36 import java.security.MessageDigest
37 import java.time.Duration
39 import java.util.concurrent.atomic.AtomicReference
40 import javax.json.Json
41 import javax.json.JsonObject
45 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
48 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
49 private val url: String,
50 private val firstRequestDelay: Duration,
51 private val requestInterval: Duration,
52 private val healthState: HealthState,
55 ) : ConfigurationProvider {
56 private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
57 private val retry = retrySpec.doOnRetry {
58 logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
59 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
62 constructor(http: HttpAdapter,
63 params: ConfigurationProviderParams) : this(
65 params.configurationUrl,
66 params.firstRequestDelay,
67 params.requestInterval,
70 .retryMax(MAX_RETRIES)
71 .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
72 .jitter(Jitter.random())
75 override fun invoke(): Flux<CollectorConfiguration> =
76 Flux.interval(firstRequestDelay, requestInterval)
77 .concatMap { askForConfig() }
78 .flatMap(::filterDifferentValues)
79 .map(::parseJsonResponse)
80 .map(::createCollectorConfiguration)
83 private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
84 val invocationId = UUID.randomUUID()
85 http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
88 private fun filterDifferentValues(configuration: BodyWithInvocationId) =
89 configuration.body.let { configurationString ->
90 configurationString.sha256().let { newHash ->
91 if (newHash contentEquals lastConfigurationHash.get()) {
92 logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
93 "No change detected in consul configuration"
97 logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
98 "Obtained new configuration from consul:\n$configurationString"
100 lastConfigurationHash.set(newHash)
101 Mono.just(configurationString)
106 private fun parseJsonResponse(responseString: String): JsonObject =
107 Json.createReader(StringReader(responseString)).readObject()
109 private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
111 val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
112 CollectorConfiguration(
114 for (route in routingArray) {
115 val routeObj = route.asJsonObject()
117 fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
118 toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
119 withFixedPartitioning()
124 } catch (e: NullPointerException) {
125 throw ParsingException("Failed to parse consul configuration", e)
130 private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
131 private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
132 private const val TOPIC_CONFIGURATION_KEY = "toTopic"
134 private const val MAX_RETRIES = 5L
135 private const val BACKOFF_INTERVAL_FACTOR = 30L
136 private val logger = Logger(ConsulConfigurationProvider::class)
137 private fun String.sha256() =
139 .getInstance("SHA-256")
140 .digest(toByteArray())
144 private data class BodyWithInvocationId(val body: String, val invocationId: UUID)