6f04c95c99ca232bc131e31fc21c79d238900360
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
25 import org.onap.dcae.collectors.veshv.utils.logging.Logger
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.retry.Jitter
31 import reactor.retry.Retry
32 import java.io.StringReader
33 import java.time.Duration
34 import java.util.*
35 import java.util.concurrent.atomic.AtomicReference
36 import javax.json.Json
37 import javax.json.JsonObject
38
39
40 /**
41  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
42  * @since May 2018
43  */
44 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
45                                            private val url: String,
46                                            private val firstRequestDelay: Duration,
47                                            private val requestInterval: Duration,
48                                            retrySpec: Retry<Any>
49 ) : ConfigurationProvider {
50
51     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
52     private val retry = retrySpec
53             .doOnRetry {
54                 logger.warn("Could not get fresh configuration", it.exception())
55             }
56
57     constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
58             http,
59             params.configurationUrl,
60             params.firstRequestDelay,
61             params.requestInterval,
62             Retry.any<Any>()
63                     .retryMax(MAX_RETRIES)
64                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
65                     .jitter(Jitter.random()))
66
67     override fun invoke(): Flux<CollectorConfiguration> =
68             Flux.interval(firstRequestDelay, requestInterval)
69                     .flatMap { askForConfig() }
70                     .map(::parseJsonResponse)
71                     .map(::extractEncodedConfiguration)
72                     .flatMap(::filterDifferentValues)
73                     .map(::decodeConfiguration)
74                     .map(::createCollectorConfiguration)
75                     .retryWhen(retry)
76
77     private fun askForConfig(): Mono<String> = http.get(url)
78
79     private fun parseJsonResponse(responseString: String): JsonObject =
80             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
81
82     private fun extractEncodedConfiguration(response: JsonObject): String =
83             response.getString("Value")
84
85     private fun filterDifferentValues(base64Value: String): Mono<String> {
86         val newHash = hashOf(base64Value)
87         return if (newHash == lastConfigurationHash.get()) {
88             Mono.empty()
89         } else {
90             lastConfigurationHash.set(newHash)
91             Mono.just(base64Value)
92         }
93     }
94
95     private fun hashOf(str: String) = str.hashCode()
96
97     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
98         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
99         logger.info("Obtained new configuration from consul:\n$decodedValue")
100         return Json.createReader(StringReader(decodedValue)).readObject()
101     }
102
103     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
104         val routing = configuration.getJsonArray("routing")
105
106         return CollectorConfiguration(
107                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
108                 routing = org.onap.dcae.collectors.veshv.model.routing {
109                     for (route in routing) {
110                         val routeObj = route.asJsonObject()
111                         defineRoute {
112                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
113                             toTopic(routeObj.getString("toTopic"))
114                             withFixedPartitioning()
115                         }
116                     }
117                 }.build()
118         )
119     }
120
121     companion object {
122         private const val MAX_RETRIES = 5
123         private const val BACKOFF_INTERVAL_FACTOR = 30L
124         private val logger = Logger(ConsulConfigurationProvider::class)
125     }
126 }
127