5b0dca2d3ab36c67fbca6e68ee661f63355e0df3
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.adapters
21
22 import com.google.gson.JsonObject
23 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
24 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
25 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
26 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
27 import org.onap.dcae.collectors.veshv.model.ServiceContext
28 import org.onap.dcae.collectors.veshv.utils.logging.Logger
29 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
30 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
31 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
32 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
33 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
34 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
35 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
36 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
37 import reactor.core.publisher.Flux
38 import reactor.core.publisher.Mono
39 import reactor.retry.Jitter
40 import reactor.retry.Retry
41 import java.time.Duration
42
43 /**
44  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
45  * @since May 2018
46  */
47 internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
48                                          private val firstRequestDelay: Duration,
49                                          private val requestInterval: Duration,
50                                          private val healthState: HealthState,
51                                          private val streamParser: StreamFromGsonParser<KafkaSink>,
52                                          retrySpec: Retry<Any>
53
54 ) : ConfigurationProvider {
55     constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
56             cbsClientMono,
57             params.firstRequestDelay,
58             params.requestInterval,
59             HealthState.INSTANCE,
60             StreamFromGsonParsers.kafkaSinkParser(),
61             Retry.any<Any>()
62                     .retryMax(MAX_RETRIES)
63                     .fixedBackoff(params.requestInterval)
64                     .jitter(Jitter.random())
65     )
66
67     private val retry = retrySpec.doOnRetry {
68         logger.withWarn(ServiceContext::mdc) {
69             log("Exception from configuration provider client, retrying subscription", it.exception())
70         }
71         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
72     }
73
74     override fun invoke(): Flux<Sequence<KafkaSink>> =
75             cbsClientMono
76                     .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
77                     .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
78                     .retryWhen(retry)
79                     .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
80                     .flatMapMany(::handleUpdates)
81
82     private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
83             .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
84                     firstRequestDelay,
85                     requestInterval)
86             .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
87             .map(::createCollectorConfiguration)
88             .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
89             .retryWhen(retry)
90
91
92     private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
93         try {
94             val sinks = DataStreams.namedSinks(configuration)
95                     .filter { it.type() == "kafka" }
96             return sinks.map(streamParser::unsafeParse).asSequence()
97
98         } catch (e: NullPointerException) {
99             throw ParsingException("Failed to parse configuration", e)
100         }
101     }
102
103     companion object {
104         private const val MAX_RETRIES = 5L
105         private val logger = Logger(ConfigurationProviderImpl::class)
106     }
107 }