aca0e7e9441b8b39c9d0c34d7558af7883c71c10
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import java.io.StringReader
31 import java.time.Duration
32 import java.util.*
33 import java.util.concurrent.atomic.AtomicReference
34 import javax.json.Json
35 import javax.json.JsonObject
36
37
38 /**
39  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
40  * @since May 2018
41  */
42 internal class ConsulConfigurationProvider(private val url: String,
43                                            private val http: HttpAdapter,
44                                            private val firstRequestDelay: Duration,
45                                            private val requestInterval: Duration
46 ) : ConfigurationProvider {
47
48     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
49
50     override fun invoke(): Flux<CollectorConfiguration> =
51             Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
52
53     private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
54             CollectorConfiguration(
55                     kafkaBootstrapServers = "kafka:9092",
56                     routing = routing {
57                         defineRoute {
58                             fromDomain(HVRANMEAS)
59                             toTopic("ves_hvRanMeas")
60                             withFixedPartitioning()
61                         }
62                     }.build())
63     ).doOnNext { logger.info("Applied default configuration") }
64
65     private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
66             .interval(firstRequestDelay, requestInterval)
67             .flatMap { http.get(url) }
68             .doOnError {
69                 logger.error("Encountered an error " +
70                         "when trying to acquire configuration from consul. Shutting down..")
71             }
72             .map(::parseJsonResponse)
73             .map(::extractEncodedConfiguration)
74             .flatMap(::filterDifferentValues)
75             .map(::decodeConfiguration)
76             .map(::createCollectorConfiguration)
77
78     private fun parseJsonResponse(responseString: String): JsonObject =
79             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
80
81     private fun extractEncodedConfiguration(response: JsonObject): String =
82             response.getString("Value")
83
84     private fun filterDifferentValues(base64Value: String): Mono<String> {
85         val newHash = hashOf(base64Value)
86         return if (newHash == lastConfigurationHash.get()) {
87             Mono.empty()
88         } else {
89             lastConfigurationHash.set(newHash)
90             Mono.just(base64Value)
91         }
92     }
93
94     private fun hashOf(str: String) = str.hashCode()
95
96     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
97         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
98         logger.info("Obtained new configuration from consul:\n$decodedValue")
99         return Json.createReader(StringReader(decodedValue)).readObject()
100     }
101
102     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
103         val routing = configuration.getJsonArray("routing")
104
105         return CollectorConfiguration(
106                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
107                 routing = org.onap.dcae.collectors.veshv.model.routing {
108                     for (route in routing) {
109                         val routeObj = route.asJsonObject()
110                         defineRoute {
111                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
112                             toTopic(routeObj.getString("toTopic"))
113                             withFixedPartitioning()
114                         }
115                     }
116                 }.build()
117         )
118     }
119
120     companion object {
121         private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
122     }
123 }
124