727f025bc437cda0908f4e3ed1bae562e4f3625d
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.ipc.netty.http.client.HttpClientException
31 import reactor.retry.Retry
32 import reactor.retry.retryExponentialBackoff
33 import java.io.StringReader
34 import java.time.Duration
35 import java.util.*
36 import java.util.concurrent.TimeUnit
37 import java.util.concurrent.atomic.AtomicReference
38 import javax.json.Json
39 import javax.json.JsonObject
40
41
42 /**
43  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
44  * @since May 2018
45  */
46 internal class ConsulConfigurationProvider(private val url: String,
47                                            private val http: HttpAdapter,
48                                            private val firstRequestDelay: Duration
49 ) : ConfigurationProvider {
50
51     private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
52     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
53
54     override fun invoke(): Flux<CollectorConfiguration> =
55             Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
56
57     private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
58             CollectorConfiguration(
59                     kafkaBootstrapServers = "kafka:9092",
60                     routing = routing {
61                         defineRoute {
62                             fromDomain(HVRANMEAS)
63                             toTopic("ves_hvRanMeas")
64                             withFixedPartitioning()
65                         }
66                     }.build())
67     ).doOnNext { logger.info("Applied default configuration") }.delayElement(firstRequestDelay)
68
69     private fun createConsulFlux(): Flux<CollectorConfiguration> =
70             http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
71                     .doOnError {
72                         logger.error("Encountered an error " +
73                                 "when trying to acquire configuration from consul. Shutting down..")
74                     }
75                     .map(::parseJsonResponse)
76                     .doOnNext(::updateModifyIndex)
77                     .map(::extractEncodedConfiguration)
78                     .flatMap(::filterDifferentValues)
79                     .map(::decodeConfiguration)
80                     .map(::createCollectorConfiguration)
81                     .repeat()
82
83     private fun parseJsonResponse(responseString: String): JsonObject =
84             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
85
86     private fun updateModifyIndex(response: JsonObject) =
87             lastModifyIndex.set(response.getInt("ModifyIndex"))
88
89     private fun extractEncodedConfiguration(response: JsonObject): String =
90             response.getString("Value")
91
92     private fun filterDifferentValues(base64Value: String): Mono<String> {
93         val newHash = hashOf(base64Value)
94         return if (newHash == lastConfigurationHash.get()) {
95             Mono.empty()
96         } else {
97             lastConfigurationHash.set(newHash)
98             Mono.just(base64Value)
99         }
100     }
101
102     private fun hashOf(str: String) = str.hashCode()
103
104     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
105         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
106         logger.info("Obtained new configuration from consul:\n$decodedValue")
107         return Json.createReader(StringReader(decodedValue)).readObject()
108     }
109
110     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
111         val routing = configuration.getJsonArray("routing")
112
113         return CollectorConfiguration(
114                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
115                 routing = org.onap.dcae.collectors.veshv.model.routing {
116                     for (route in routing) {
117                         val routeObj = route.asJsonObject()
118                         defineRoute {
119                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
120                             toTopic(routeObj.getString("toTopic"))
121                             withFixedPartitioning()
122                         }
123                     }
124                 }.build()
125         )
126     }
127
128     companion object {
129         private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
130     }
131 }
132