d8ea45d63bc5516ebdf75ebf46866c0120328c0b
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConsulConfigurationProvider.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
27 import org.onap.dcae.collectors.veshv.utils.logging.Logger
28 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
29 import reactor.core.publisher.Flux
30 import reactor.core.publisher.Mono
31 import reactor.retry.Jitter
32 import reactor.retry.Retry
33 import java.io.StringReader
34 import java.time.Duration
35 import java.util.*
36 import java.util.concurrent.atomic.AtomicReference
37 import javax.json.Json
38 import javax.json.JsonObject
39
40
41 /**
42  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
43  * @since May 2018
44  */
45 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
46                                            private val url: String,
47                                            private val firstRequestDelay: Duration,
48                                            private val requestInterval: Duration,
49                                            private val healthState: HealthState,
50                                            retrySpec: Retry<Any>
51
52 ) : ConfigurationProvider {
53
54     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
55     private val retry = retrySpec
56             .doOnRetry {
57                 logger.warn("Could not get fresh configuration", it.exception())
58                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
59             }
60
61     constructor(http: HttpAdapter,
62                 params: ConfigurationProviderParams) : this(
63             http,
64             params.configurationUrl,
65             params.firstRequestDelay,
66             params.requestInterval,
67             HealthState.INSTANCE,
68             Retry.any<Any>()
69                     .retryMax(MAX_RETRIES)
70                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
71                     .jitter(Jitter.random())
72     )
73
74     override fun invoke(): Flux<CollectorConfiguration> =
75             Flux.interval(firstRequestDelay, requestInterval)
76                     .flatMap { askForConfig() }
77                     .map(::parseJsonResponse)
78                     .map(::extractEncodedConfiguration)
79                     .flatMap(::filterDifferentValues)
80                     .map(::decodeConfiguration)
81                     .map(::createCollectorConfiguration)
82                     .retryWhen(retry)
83
84     private fun askForConfig(): Mono<String> = http.get(url)
85
86     private fun parseJsonResponse(responseString: String): JsonObject =
87             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
88
89     private fun extractEncodedConfiguration(response: JsonObject): String =
90             response.getString("Value")
91
92     private fun filterDifferentValues(base64Value: String): Mono<String> {
93         val newHash = hashOf(base64Value)
94         return if (newHash == lastConfigurationHash.get()) {
95             Mono.empty()
96         } else {
97             lastConfigurationHash.set(newHash)
98             Mono.just(base64Value)
99         }
100     }
101
102     private fun hashOf(str: String) = str.hashCode()
103
104     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
105         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
106         logger.info("Obtained new configuration from consul:\n$decodedValue")
107         return Json.createReader(StringReader(decodedValue)).readObject()
108     }
109
110     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
111         val routing = configuration.getJsonArray("collector.routing")
112
113         return CollectorConfiguration(
114                 kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
115                 routing = org.onap.dcae.collectors.veshv.model.routing {
116                     for (route in routing) {
117                         val routeObj = route.asJsonObject()
118                         defineRoute {
119                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
120                             toTopic(routeObj.getString("toTopic"))
121                             withFixedPartitioning()
122                         }
123                     }
124                 }.build()
125         )
126     }
127
128     companion object {
129         private const val MAX_RETRIES = 5
130         private const val BACKOFF_INTERVAL_FACTOR = 30L
131         private val logger = Logger(ConsulConfigurationProvider::class)
132     }
133 }
134