import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
private val healthState: HealthState,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
retrySpec: Retry<Any>
) : ConfigurationProvider {
params.firstRequestDelay,
params.requestInterval,
HealthState.INSTANCE,
+ StreamFromGsonParsers.kafkaSinkParser(),
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval)
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Routing> =
+ override fun invoke(): Flux<Sequence<KafkaSink>> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): Routing =
- try {
- val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
- private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+ private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
+ try {
+ val sinks = DataStreams.namedSinks(configuration)
+ .filter { it.type() == "kafka" }
+ return sinks.map(streamParser::unsafeParse).asSequence()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+ }
companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
private const val MAX_RETRIES = 5L
private val logger = Logger(ConfigurationProviderImpl::class)
}