import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
import reactor.retry.Retry
import java.io.StringReader
import java.time.Duration
+import java.util.*
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
private val retry = retrySpec
.doOnRetry {
- logger.withWarn { log("Could not get fresh configuration", it.exception()) }
+ logger.withWarn(ServiceContext::mdc) { log("Could not get fresh configuration", it.exception()) }
healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
.map(::createCollectorConfiguration)
.retryWhen(retry)
- private fun askForConfig(): Mono<String> = http.get(url)
+ private fun askForConfig(): Mono<BodyWithInvocationId> = Mono.defer {
+ val invocationId = UUID.randomUUID()
+ http.get(url, invocationId).map { BodyWithInvocationId(it, invocationId) }
+ }
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- logger.trace { "No change detected in consul configuration" }
- Mono.empty()
- } else {
- logger.info { "Obtained new configuration from consul:\n${configurationString}" }
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
+ private fun filterDifferentValues(configuration: BodyWithInvocationId) =
+ configuration.body.let { configurationString ->
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ logger.trace(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "No change detected in consul configuration"
+ }
+ Mono.empty()
+ } else {
+ logger.info(ServiceContext::mdc, Marker.Invoke(configuration.invocationId)) {
+ "Obtained new configuration from consul:\n${configurationString}"
+ }
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
}
}
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
-}
+ private data class BodyWithInvocationId(val body: String, val invocationId: UUID)
+}