04e4927df285d629522382dffa41a746008d13ac
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
24 import org.onap.ves.VesEventV5
25 import org.slf4j.LoggerFactory
26 import reactor.core.publisher.Flux
27 import java.io.StringReader
28 import java.time.Duration
29 import java.util.*
30 import java.util.concurrent.atomic.AtomicReference
31 import javax.json.Json
32 import javax.json.JsonObject
33
34
35 /**
36  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
37  * @since May 2018
38  */
39 internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
40     : ConfigurationProvider {
41
42
43     private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
44     private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
45
46     override fun invoke(): Flux<CollectorConfiguration> =
47             Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
48                     .flatMap { http.getResponse(url) }
49                     .filter { body -> body.hashCode() != lastConfigurationHash.get() }
50                     .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
51                     .map { str -> getConfigurationJson(str) }
52                     .map { json -> createCollectorConfiguration(json) }
53
54     private fun getConfigurationJson(str: String): JsonObject {
55         val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
56         val decodedValue = String(
57                 Base64.getDecoder().decode(response.getString("Value")))
58         logger.info("Obtained new configuration from consul:\n$decodedValue")
59         return Json.createReader(StringReader(decodedValue)).readObject()
60     }
61
62     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
63
64         val routing = configuration.getJsonObject("routing")
65
66         return CollectorConfiguration(
67                 kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
68                 routing = org.onap.dcae.collectors.veshv.model.routing {
69                     defineRoute {
70                         fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
71                         toTopic(routing.getString("toTopic"))
72                         withFixedPartitioning()
73                     }
74                 }.build()
75         )
76     }
77
78     companion object {
79         private const val REFRESH_INTERVAL_MINUTES: Long = 5
80         private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
81     }
82 }