Use DataStream API from CBS client
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConfigurationProviderImpl.kt
index f96350a..5b0dca2 100644 (file)
@@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                                          private val firstRequestDelay: Duration,
                                          private val requestInterval: Duration,
                                          private val healthState: HealthState,
+                                         private val streamParser: StreamFromGsonParser<KafkaSink>,
                                          retrySpec: Retry<Any>
 
 ) : ConfigurationProvider {
@@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             params.firstRequestDelay,
             params.requestInterval,
             HealthState.INSTANCE,
+            StreamFromGsonParsers.kafkaSinkParser(),
             Retry.any<Any>()
                     .retryMax(MAX_RETRIES)
                     .fixedBackoff(params.requestInterval)
@@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
     }
 
-    override fun invoke(): Flux<Routing> =
+    override fun invoke(): Flux<Sequence<KafkaSink>> =
             cbsClientMono
                     .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
                     .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                     .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
                     .flatMapMany(::handleUpdates)
 
-    private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
+    private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
             .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
                     firstRequestDelay,
                     requestInterval)
@@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             .retryWhen(retry)
 
 
-    private fun createCollectorConfiguration(configuration: JsonObject): Routing =
-            try {
-                val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
-                routing {
-                    for (route in routingArray) {
-                        val routeObj = route.asJsonObject
-                        defineRoute {
-                            fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
-                            toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
-                            withFixedPartitioning()
-                        }
-                    }
-                }.build()
-            } catch (e: NullPointerException) {
-                throw ParsingException("Failed to parse configuration", e)
-            }
-
-    private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+    private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
+        try {
+            val sinks = DataStreams.namedSinks(configuration)
+                    .filter { it.type() == "kafka" }
+            return sinks.map(streamParser::unsafeParse).asSequence()
 
+        } catch (e: NullPointerException) {
+            throw ParsingException("Failed to parse configuration", e)
+        }
+    }
 
     companion object {
-        private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
-        private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
-        private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
         private const val MAX_RETRIES = 5L
         private val logger = Logger(ConfigurationProviderImpl::class)
     }