2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.impl.adapters
 
  22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 
  23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 
  24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 
  25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 
  26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 
  27 import org.onap.dcae.collectors.veshv.model.ServiceContext
 
  28 import org.onap.dcae.collectors.veshv.model.routing
 
  29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
  30 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 
  31 import reactor.core.publisher.Flux
 
  32 import reactor.core.publisher.Mono
 
  33 import reactor.retry.Jitter
 
  34 import reactor.retry.Retry
 
  35 import java.io.StringReader
 
  36 import java.security.MessageDigest
 
  37 import java.time.Duration
 
  39 import java.util.concurrent.atomic.AtomicReference
 
  40 import javax.json.Json
 
  41 import javax.json.JsonObject
 
  45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
 
  48 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
 
  49                                            private val url: String,
 
  50                                            private val firstRequestDelay: Duration,
 
  51                                            private val requestInterval: Duration,
 
  52                                            private val healthState: HealthState,
 
  55 ) : ConfigurationProvider {
 
  56     private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
 
  57     private val retry = retrySpec.doOnRetry {
 
  58         logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
 
  59         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
 
  62     constructor(http: HttpAdapter,
 
  63                 params: ConfigurationProviderParams) : this(
 
  65             params.configurationUrl,
 
  66             params.firstRequestDelay,
 
  67             params.requestInterval,
 
  70                     .retryMax(MAX_RETRIES)
 
  71                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
 
  72                     .jitter(Jitter.random())
 
  75     override fun invoke(): Flux<CollectorConfiguration> =
 
  76             Flux.interval(firstRequestDelay, requestInterval)
 
  77                     .concatMap { askForConfig() }
 
  78                     .flatMap(::filterDifferentValues)
 
  79                     .map(::parseJsonResponse)
 
  80                     .map(::createCollectorConfiguration)
 
  83     private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
 
  84         val invocationId = UUID.randomUUID()
 
  85         http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
 
  88     private fun filterDifferentValues(configuration: BodyWithInvocationId) =
 
  89             configuration.body.let { configurationString ->
 
  90                 configurationString.sha256().let { newHash ->
 
  91                     if (newHash contentEquals lastConfigurationHash.get()) {
 
  92                         logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
 
  93                             "No change detected in consul configuration"
 
  97                         logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
 
  98                             "Obtained new configuration from consul:\n$configurationString"
 
 100                         lastConfigurationHash.set(newHash)
 
 101                         Mono.just(configurationString)
 
 106     private fun parseJsonResponse(responseString: String): JsonObject =
 
 107             Json.createReader(StringReader(responseString)).readObject()
 
 109     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
 
 111                 val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
 
 112                 CollectorConfiguration(
 
 114                             for (route in routingArray) {
 
 115                                 val routeObj = route.asJsonObject()
 
 117                                     fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
 
 118                                     toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
 
 119                                     withFixedPartitioning()
 
 124             } catch (e: NullPointerException) {
 
 125                 throw ParsingException("Failed to parse consul configuration", e)
 
 130         private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
 
 131         private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
 
 132         private const val TOPIC_CONFIGURATION_KEY = "toTopic"
 
 134         private const val MAX_RETRIES = 5L
 
 135         private const val BACKOFF_INTERVAL_FACTOR = 30L
 
 136         private val logger = Logger(ConsulConfigurationProvider::class)
 
 137         private fun String.sha256() =
 
 139                         .getInstance("SHA-256")
 
 140                         .digest(toByteArray())
 
 144     private data class BodyWithInvocationId(val body: String, val invocationId: UUID)