2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.ipc.netty.http.client.HttpClientException
31 import reactor.retry.Retry
32 import reactor.retry.retryExponentialBackoff
33 import java.io.StringReader
34 import java.time.Duration
36 import java.util.concurrent.TimeUnit
37 import java.util.concurrent.atomic.AtomicReference
38 import javax.json.Json
39 import javax.json.JsonObject
43 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46 internal class ConsulConfigurationProvider(private val url: String,
47 private val http: HttpAdapter,
48 private val firstRequestDelay: Duration
49 ) : ConfigurationProvider {
51 private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
52 private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
54 override fun invoke(): Flux<CollectorConfiguration> =
55 Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
57 private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
58 CollectorConfiguration(
59 kafkaBootstrapServers = "kafka:9092",
63 toTopic("ves_hvRanMeas")
64 withFixedPartitioning()
67 ).doOnNext { logger.info("Applied default configuration") }
69 private fun createConsulFlux(): Flux<CollectorConfiguration> =
70 http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
72 logger.error("Encountered an error " +
73 "when trying to acquire configuration from consul. Shutting down..")
75 .map(::parseJsonResponse)
76 .doOnNext(::updateModifyIndex)
77 .map(::extractEncodedConfiguration)
78 .flatMap(::filterDifferentValues)
79 .map(::decodeConfiguration)
80 .map(::createCollectorConfiguration)
82 .delaySubscription(firstRequestDelay)
84 private fun parseJsonResponse(responseString: String): JsonObject =
85 Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
87 private fun updateModifyIndex(response: JsonObject) =
88 lastModifyIndex.set(response.getInt("ModifyIndex"))
90 private fun extractEncodedConfiguration(response: JsonObject): String =
91 response.getString("Value")
93 private fun filterDifferentValues(base64Value: String): Mono<String> {
94 val newHash = hashOf(base64Value)
95 return if (newHash == lastConfigurationHash.get()) {
98 lastConfigurationHash.set(newHash)
99 Mono.just(base64Value)
103 private fun hashOf(str: String) = str.hashCode()
105 private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
106 val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
107 logger.info("Obtained new configuration from consul:\n$decodedValue")
108 return Json.createReader(StringReader(decodedValue)).readObject()
111 private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
112 val routing = configuration.getJsonArray("routing")
114 return CollectorConfiguration(
115 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
116 routing = org.onap.dcae.collectors.veshv.model.routing {
117 for (route in routing) {
118 val routeObj = route.asJsonObject()
120 fromDomain(forNumber(routeObj.getInt("fromDomain")))
121 toTopic(routeObj.getString("toTopic"))
122 withFixedPartitioning()
130 private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)