bbaa47c49eb1a98b1b0cc0932be294f240fd75d4
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConsulConfigurationProvider.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
27 import org.onap.dcae.collectors.veshv.utils.logging.Logger
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.retry.Jitter
31 import reactor.retry.Retry
32 import java.io.StringReader
33 import java.time.Duration
34 import java.util.concurrent.atomic.AtomicReference
35 import javax.json.Json
36 import javax.json.JsonObject
37
38
39 /**
40  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
41  * @since May 2018
42  */
43 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
44                                            private val url: String,
45                                            private val firstRequestDelay: Duration,
46                                            private val requestInterval: Duration,
47                                            private val healthState: HealthState,
48                                            retrySpec: Retry<Any>
49
50 ) : ConfigurationProvider {
51
52     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
53     private val retry = retrySpec
54             .doOnRetry {
55                 logger.withWarn { log("Could not get fresh configuration", it.exception()) }
56                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
57             }
58
59     constructor(http: HttpAdapter,
60                 params: ConfigurationProviderParams) : this(
61             http,
62             params.configurationUrl,
63             params.firstRequestDelay,
64             params.requestInterval,
65             HealthState.INSTANCE,
66             Retry.any<Any>()
67                     .retryMax(MAX_RETRIES)
68                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
69                     .jitter(Jitter.random())
70     )
71
72     override fun invoke(): Flux<CollectorConfiguration> =
73             Flux.interval(firstRequestDelay, requestInterval)
74                     .concatMap { askForConfig() }
75                     .flatMap(::filterDifferentValues)
76                     .map(::parseJsonResponse)
77                     .map(::createCollectorConfiguration)
78                     .retryWhen(retry)
79
80     private fun askForConfig(): Mono<String> = http.get(url)
81
82     private fun filterDifferentValues(configurationString: String) =
83             hashOf(configurationString).let {
84                 if (it == lastConfigurationHash.get()) {
85                     logger.trace { "No change detected in consul configuration" }
86                     Mono.empty()
87                 } else {
88                     logger.info { "Obtained new configuration from consul:\n${configurationString}" }
89                     lastConfigurationHash.set(it)
90                     Mono.just(configurationString)
91                 }
92             }
93
94     private fun hashOf(str: String) = str.hashCode()
95
96     private fun parseJsonResponse(responseString: String): JsonObject =
97             Json.createReader(StringReader(responseString)).readObject()
98
99     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
100         val routing = configuration.getJsonArray("collector.routing")
101
102         return CollectorConfiguration(
103                 kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
104                 routing = org.onap.dcae.collectors.veshv.model.routing {
105                     for (route in routing) {
106                         val routeObj = route.asJsonObject()
107                         defineRoute {
108                             fromDomain(routeObj.getString("fromDomain"))
109                             toTopic(routeObj.getString("toTopic"))
110                             withFixedPartitioning()
111                         }
112                     }
113                 }.build()
114         )
115     }
116
117     companion object {
118         private const val MAX_RETRIES = 5L
119         private const val BACKOFF_INTERVAL_FACTOR = 30L
120         private val logger = Logger(ConsulConfigurationProvider::class)
121     }
122 }
123