2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.adapters
22 import com.google.gson.JsonObject
23 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
24 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
25 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
27 import org.onap.dcae.collectors.veshv.model.ServiceContext
28 import org.onap.dcae.collectors.veshv.utils.logging.Logger
29 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
30 import org.onap.dcaegen2.services.sdk.model.streams.StreamType
31 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
32 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
33 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates
38 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
39 import reactor.core.publisher.Flux
40 import reactor.core.publisher.Mono
41 import reactor.retry.Jitter
42 import reactor.retry.Retry
43 import java.time.Duration
46 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
49 internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
50 private val firstRequestDelay: Duration,
51 private val requestInterval: Duration,
52 private val healthState: HealthState,
53 private val streamParser: StreamFromGsonParser<KafkaSink>,
56 ) : ConfigurationProvider {
57 constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
59 params.firstRequestDelay,
60 params.requestInterval,
62 StreamFromGsonParsers.kafkaSinkParser(),
64 .retryMax(MAX_RETRIES)
65 .fixedBackoff(params.requestInterval)
66 .jitter(Jitter.random())
69 private val retry = retrySpec.doOnRetry {
70 logger.withWarn(ServiceContext::mdc) {
71 log("Exception from configuration provider client, retrying subscription", it.exception())
73 healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
76 override fun invoke(): Flux<Sequence<KafkaSink>> =
78 .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
79 .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
81 .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
82 .flatMapMany(::handleUpdates)
84 private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
85 .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
88 .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
89 .map(::createCollectorConfiguration)
90 .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
94 private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
96 DataStreams.namedSinks(configuration)
97 .filter(StreamPredicates.streamOfType(StreamType.KAFKA))
98 .map(streamParser::unsafeParse)
100 } catch (e: NullPointerException) {
101 throw ParsingException("Failed to parse configuration", e)
106 private const val MAX_RETRIES = 5L
107 private val logger = Logger(ConfigurationProviderImpl::class)