2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.config.impl
22 import arrow.core.None
23 import arrow.core.Option
24 import arrow.core.Some
25 import com.google.gson.JsonObject
26 import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
27 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
28 import org.onap.dcae.collectors.veshv.config.api.model.Route
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.utils.logging.Logger
31 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
32 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
33 import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
34 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
36 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
37 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
38 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
39 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
40 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
41 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
42 import reactor.core.publisher.Flux
43 import reactor.core.publisher.Mono
44 import reactor.retry.Jitter
45 import reactor.retry.Retry
48 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
51 internal class CbsConfigurationProvider(private val cbsClientMono: Mono<CbsClient>,
52 private val cbsConfiguration: CbsConfiguration,
53 private val streamParser: StreamFromGsonParser<KafkaSink>,
54 private val configurationStateListener: ConfigurationStateListener,
55 retrySpec: Retry<Any>,
56 private val mdc: MappedDiagnosticContext
59 constructor(cbsClientMono: Mono<CbsClient>,
60 cbsConfig: CbsConfiguration,
61 configurationStateListener: ConfigurationStateListener,
62 mdc: MappedDiagnosticContext) :
66 StreamFromGsonParsers.kafkaSinkParser(),
67 configurationStateListener,
69 .retryMax(MAX_RETRIES)
70 .fixedBackoff(cbsConfig.requestInterval)
71 .jitter(Jitter.random()),
75 private val retry = retrySpec.doOnRetry {
76 logger.withWarn(mdc) {
77 log("Exception from configuration provider client, retrying subscription", it.exception())
79 configurationStateListener.retrying()
82 operator fun invoke(): Flux<PartialConfiguration> =
84 .doOnNext { logger.info(mdc) { "CBS client successfully created" } }
85 .onErrorLog(logger, mdc) { "Failed to retrieve CBS client" }
87 .doFinally { logger.trace(mdc) { "CBS client subscription finished" } }
88 .flatMapMany(::handleUpdates)
90 private fun handleUpdates(cbsClient: CbsClient) = cbsClient
91 .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
92 cbsConfiguration.firstRequestDelay,
93 cbsConfiguration.requestInterval)
94 .doOnNext { logger.info(mdc) { "Received new configuration:\n$it" } }
95 .map(::createRoutingDescription)
96 .onErrorLog(logger, mdc) { "Error while creating configuration" }
98 .map { PartialConfiguration(routing = it) }
100 private fun createRoutingDescription(configuration: JsonObject): Option<Routing> = try {
101 val routes = DataStreams.namedSinks(configuration)
102 .filter(streamOfType(KAFKA))
103 .map(streamParser::unsafeParse)
104 .map { Route(it.name(), it) }
108 } catch (e: NullPointerException) {
109 logger.withWarn(mdc) {
110 log("Invalid streams configuration", e)
116 private const val MAX_RETRIES = 5L
117 private val logger = Logger(CbsConfigurationProvider::class)