2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.impl.adapters
 
  22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 
  23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 
  24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 
  25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 
  26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 
  27 import org.onap.dcae.collectors.veshv.model.ServiceContext
 
  28 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
  29 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 
  30 import reactor.core.publisher.Flux
 
  31 import reactor.core.publisher.Mono
 
  32 import reactor.retry.Jitter
 
  33 import reactor.retry.Retry
 
  34 import java.io.StringReader
 
  35 import java.security.MessageDigest
 
  36 import java.time.Duration
 
  38 import java.util.concurrent.atomic.AtomicReference
 
  39 import javax.json.Json
 
  40 import javax.json.JsonObject
 
  44  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 
  47 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
 
  48                                            private val url: String,
 
  49                                            private val firstRequestDelay: Duration,
 
  50                                            private val requestInterval: Duration,
 
  51                                            private val healthState: HealthState,
 
  54 ) : ConfigurationProvider {
 
  55     private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
 
  56     private val retry = retrySpec
 
  58                 logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
 
  59                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
 
  62     constructor(http: HttpAdapter,
 
  63                 params: ConfigurationProviderParams) : this(
 
  65             params.configurationUrl,
 
  66             params.firstRequestDelay,
 
  67             params.requestInterval,
 
  70                     .retryMax(MAX_RETRIES)
 
  71                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
 
  72                     .jitter(Jitter.random())
 
  75     override fun invoke(): Flux<CollectorConfiguration> =
 
  76             Flux.interval(firstRequestDelay, requestInterval)
 
  77                     .concatMap { askForConfig() }
 
  78                     .flatMap(::filterDifferentValues)
 
  79                     .map(::parseJsonResponse)
 
  80                     .map(::createCollectorConfiguration)
 
  83     private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
 
  84         val invocationId = UUID.randomUUID()
 
  85         http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
 
  88     private fun filterDifferentValues(configuration: BodyWithInvocationId) =
 
  89             configuration.body.let { configurationString ->
 
  90                 configurationString.sha256().let { newHash ->
 
  91                     if (newHash contentEquals lastConfigurationHash.get()) {
 
  92                         logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
 
  93                             "No change detected in consul configuration"
 
  97                         logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
 
  98                             "Obtained new configuration from consul:\n${configurationString}"
 
 100                         lastConfigurationHash.set(newHash)
 
 101                         Mono.just(configurationString)
 
 106     private fun parseJsonResponse(responseString: String): JsonObject =
 
 107             Json.createReader(StringReader(responseString)).readObject()
 
 109     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
 
 110         val routing = configuration.getJsonArray("collector.routing")
 
 112         return CollectorConfiguration(
 
 113                 kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
 
 114                 routing = org.onap.dcae.collectors.veshv.model.routing {
 
 115                     for (route in routing) {
 
 116                         val routeObj = route.asJsonObject()
 
 118                             fromDomain(routeObj.getString("fromDomain"))
 
 119                             toTopic(routeObj.getString("toTopic"))
 
 120                             withFixedPartitioning()
 
 128         private const val MAX_RETRIES = 5L
 
 129         private const val BACKOFF_INTERVAL_FACTOR = 30L
 
 130         private val logger = Logger(ConsulConfigurationProvider::class)
 
 132         private fun String.sha256() =
 
 134                         .getInstance("SHA-256")
 
 135                         .digest(toByteArray())
 
 139     private data class BodyWithInvocationId(val body: String, val invocationId: UUID)