786bc7cebebf53d2abf8d686f3092a1ad203bc92
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import reactor.ipc.netty.http.client.HttpClientException
31 import reactor.retry.Retry
32 import reactor.retry.retryExponentialBackoff
33 import java.io.StringReader
34 import java.time.Duration
35 import java.util.*
36 import java.util.concurrent.TimeUnit
37 import java.util.concurrent.atomic.AtomicReference
38 import javax.json.Json
39 import javax.json.JsonObject
40
41
42 /**
43  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
44  * @since May 2018
45  */
46 internal class ConsulConfigurationProvider(private val url: String,
47                                            private val http: HttpAdapter,
48                                            private val firstRequestDelay: Duration
49 ) : ConfigurationProvider {
50
51     private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
52     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
53
54     override fun invoke(): Flux<CollectorConfiguration> =
55             Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
56
57     private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
58             CollectorConfiguration(
59                     kafkaBootstrapServers = "kafka:9092",
60                     routing = routing {
61                         defineRoute {
62                             fromDomain(HVRANMEAS)
63                             toTopic("ves_hvRanMeas")
64                             withFixedPartitioning()
65                         }
66                     }.build())
67     ).doOnNext { logger.info("Applied default configuration") }
68
69     private fun createConsulFlux(): Flux<CollectorConfiguration> =
70             http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
71                     .doOnError {
72                         logger.error("Encountered an error " +
73                                 "when trying to acquire configuration from consul. Shutting down..")
74                     }
75                     .map(::parseJsonResponse)
76                     .doOnNext(::updateModifyIndex)
77                     .map(::extractEncodedConfiguration)
78                     .flatMap(::filterDifferentValues)
79                     .map(::decodeConfiguration)
80                     .map(::createCollectorConfiguration)
81                     .repeat()
82                     .delaySubscription(firstRequestDelay)
83
84     private fun parseJsonResponse(responseString: String): JsonObject =
85             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
86
87     private fun updateModifyIndex(response: JsonObject) =
88             lastModifyIndex.set(response.getInt("ModifyIndex"))
89
90     private fun extractEncodedConfiguration(response: JsonObject): String =
91             response.getString("Value")
92
93     private fun filterDifferentValues(base64Value: String): Mono<String> {
94         val newHash = hashOf(base64Value)
95         return if (newHash == lastConfigurationHash.get()) {
96             Mono.empty()
97         } else {
98             lastConfigurationHash.set(newHash)
99             Mono.just(base64Value)
100         }
101     }
102
103     private fun hashOf(str: String) = str.hashCode()
104
105     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
106         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
107         logger.info("Obtained new configuration from consul:\n$decodedValue")
108         return Json.createReader(StringReader(decodedValue)).readObject()
109     }
110
111     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
112         val routing = configuration.getJsonArray("routing")
113
114         return CollectorConfiguration(
115                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
116                 routing = org.onap.dcae.collectors.veshv.model.routing {
117                     for (route in routing) {
118                         val routeObj = route.asJsonObject()
119                         defineRoute {
120                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
121                             toTopic(routeObj.getString("toTopic"))
122                             withFixedPartitioning()
123                         }
124                     }
125                 }.build()
126         )
127     }
128
129     companion object {
130         private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
131     }
132 }
133