Merge "Assure coverage is checked for all modules"
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConsulConfigurationProvider.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
23 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
24 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
25 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
26 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
27 import org.onap.dcae.collectors.veshv.model.ServiceContext
28 import org.onap.dcae.collectors.veshv.model.routing
29 import org.onap.dcae.collectors.veshv.utils.logging.Logger
30 import org.onap.dcae.collectors.veshv.utils.logging.Marker
31 import reactor.core.publisher.Flux
32 import reactor.core.publisher.Mono
33 import reactor.retry.Jitter
34 import reactor.retry.Retry
35 import java.io.StringReader
36 import java.security.MessageDigest
37 import java.time.Duration
38 import java.util.*
39 import java.util.concurrent.atomic.AtomicReference
40 import javax.json.Json
41 import javax.json.JsonObject
42
43
44 /**
45  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
46  * @since May 2018
47  */
48 internal class ConsulConfigurationProvider(private val http: HttpAdapter,
49                                            private val url: String,
50                                            private val firstRequestDelay: Duration,
51                                            private val requestInterval: Duration,
52                                            private val healthState: HealthState,
53                                            retrySpec: Retry<Any>
54
55 ) : ConfigurationProvider {
56     private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
57     private val retry = retrySpec.doOnRetry {
58         logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
59         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
60     }
61
62     constructor(http: HttpAdapter,
63                 params: ConfigurationProviderParams) : this(
64             http,
65             params.configurationUrl,
66             params.firstRequestDelay,
67             params.requestInterval,
68             HealthState.INSTANCE,
69             Retry.any<Any>()
70                     .retryMax(MAX_RETRIES)
71                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
72                     .jitter(Jitter.random())
73     )
74
75     override fun invoke(): Flux<CollectorConfiguration> =
76             Flux.interval(firstRequestDelay, requestInterval)
77                     .concatMap { askForConfig() }
78                     .flatMap(::filterDifferentValues)
79                     .map(::parseJsonResponse)
80                     .map(::createCollectorConfiguration)
81                     .retryWhen(retry)
82
83     private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
84         val invocationId = UUID.randomUUID()
85         http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
86     }
87
88     private fun filterDifferentValues(configuration: BodyWithInvocationId) =
89             configuration.body.let { configurationString ->
90                 configurationString.sha256().let { newHash ->
91                     if (newHash contentEquals lastConfigurationHash.get()) {
92                         logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
93                             "No change detected in consul configuration"
94                         }
95                         Mono.empty()
96                     } else {
97                         logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
98                             "Obtained new configuration from consul:\n$configurationString"
99                         }
100                         lastConfigurationHash.set(newHash)
101                         Mono.just(configurationString)
102                     }
103                 }
104             }
105
106     private fun parseJsonResponse(responseString: String): JsonObject =
107             Json.createReader(StringReader(responseString)).readObject()
108
109     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration =
110             try {
111                 val routingArray = configuration.getJsonArray(ROUTING_CONFIGURATION_KEY)
112                 CollectorConfiguration(
113                         routing {
114                             for (route in routingArray) {
115                                 val routeObj = route.asJsonObject()
116                                 defineRoute {
117                                     fromDomain(routeObj.getString(DOMAIN_CONFIGURATION_KEY))
118                                     toTopic(routeObj.getString(TOPIC_CONFIGURATION_KEY))
119                                     withFixedPartitioning()
120                                 }
121                             }
122                         }.build()
123                 )
124             } catch (e: NullPointerException) {
125                 throw ParsingException("Failed to parse consul configuration", e)
126             }
127
128
129     companion object {
130         private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
131         private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
132         private const val TOPIC_CONFIGURATION_KEY = "toTopic"
133
134         private const val MAX_RETRIES = 5L
135         private const val BACKOFF_INTERVAL_FACTOR = 30L
136         private val logger = Logger(ConsulConfigurationProvider::class)
137         private fun String.sha256() =
138                 MessageDigest
139                         .getInstance("SHA-256")
140                         .digest(toByteArray())
141
142     }
143
144     private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
145 }