621c63f8b511be04435af82a78899e8ef5504bf3
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.dcae.collectors.veshv.model.routing
25 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
26 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
27 import org.slf4j.LoggerFactory
28 import reactor.core.publisher.Flux
29 import reactor.core.publisher.Mono
30 import java.io.StringReader
31 import java.time.Duration
32 import java.util.*
33 import java.util.concurrent.atomic.AtomicReference
34 import javax.json.Json
35 import javax.json.JsonObject
36
37
38 /**
39  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
40  * @since May 2018
41  */
42 internal class ConsulConfigurationProvider(private val url: String,
43                                            private val http: HttpAdapter,
44                                            private val firstRequestDelay: Duration
45 ) : ConfigurationProvider {
46
47     private val lastModifyIndex: AtomicReference<Int> = AtomicReference(0)
48     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
49
50     override fun invoke(): Flux<CollectorConfiguration> =
51             Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
52
53     private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
54             CollectorConfiguration(
55                     kafkaBootstrapServers = "kafka:9092",
56                     routing = routing {
57                         defineRoute {
58                             fromDomain(HVRANMEAS)
59                             toTopic("ves_hvRanMeas")
60                             withFixedPartitioning()
61                         }
62                     }.build())
63     ).doOnNext { logger.info("Applied default configuration") }
64
65     private fun createConsulFlux(): Flux<CollectorConfiguration> =
66             http.get(url, mapOf(Pair("index", lastModifyIndex.get())))
67                     .doOnError {
68                         logger.error("Encountered an error " +
69                                 "when trying to acquire configuration from consul. Shutting down..")
70                     }
71                     .map(::parseJsonResponse)
72                     .doOnNext(::updateModifyIndex)
73                     .map(::extractEncodedConfiguration)
74                     .flatMap(::filterDifferentValues)
75                     .map(::decodeConfiguration)
76                     .map(::createCollectorConfiguration)
77                     .repeat()
78                     .delaySubscription(firstRequestDelay)
79
80     private fun parseJsonResponse(responseString: String): JsonObject =
81             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
82
83     private fun updateModifyIndex(response: JsonObject) =
84             lastModifyIndex.set(response.getInt("ModifyIndex"))
85
86     private fun extractEncodedConfiguration(response: JsonObject): String =
87             response.getString("Value")
88
89     private fun filterDifferentValues(base64Value: String): Mono<String> {
90         val newHash = hashOf(base64Value)
91         return if (newHash == lastConfigurationHash.get()) {
92             Mono.empty()
93         } else {
94             lastConfigurationHash.set(newHash)
95             Mono.just(base64Value)
96         }
97     }
98
99     private fun hashOf(str: String) = str.hashCode()
100
101     private fun decodeConfiguration(encodedConfiguration: String): JsonObject {
102         val decodedValue = String(Base64.getDecoder().decode(encodedConfiguration))
103         logger.info("Obtained new configuration from consul:\n$decodedValue")
104         return Json.createReader(StringReader(decodedValue)).readObject()
105     }
106
107     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
108         val routing = configuration.getJsonArray("routing")
109
110         return CollectorConfiguration(
111                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
112                 routing = org.onap.dcae.collectors.veshv.model.routing {
113                     for (route in routing) {
114                         val routeObj = route.asJsonObject()
115                         defineRoute {
116                             fromDomain(forNumber(routeObj.getInt("fromDomain")))
117                             toTopic(routeObj.getString("toTopic"))
118                             withFixedPartitioning()
119                         }
120                     }
121                 }.build()
122         )
123     }
124
125     companion object {
126         private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
127     }
128 }
129