Use DataStream API from CBS client 29/82929/6
authorkjaniak <kornel.janiak@nokia.com>
Thu, 21 Mar 2019 13:03:53 +0000 (14:03 +0100)
committerkjaniak <kornel.janiak@nokia.com>
Tue, 26 Mar 2019 12:05:51 +0000 (13:05 +0100)
Change-Id: Ief92f793282288938c6663616e9613c6df2d8ddb
Issue-ID: DCAEGEN2-1346
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
sources/hv-collector-core/pom.xml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt

index 823f671..e7134e1 100644 (file)
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>cbs-client</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
index 782d232..f475a0e 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -27,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import reactor.core.publisher.Flux
 
 interface Sink {
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
 }
 
 interface ConfigurationProvider {
-    operator fun invoke(): Flux<Routing>
+    operator fun invoke(): Flux<Sequence<KafkaSink>>
 }
index c08df74..c674ef3 100644 (file)
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -37,6 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val config = AtomicReference<Routing>()
+        val config = AtomicReference<Sequence<KafkaSink>>()
         configuration()
                 .doOnNext {
                     logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,12 +71,12 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
         }
     }
 
-    private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+    private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
             VesHvCollector(
                     clientContext = ctx,
                     wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
-                    router = Router(routing, ctx),
+                    router = Router(kafkaSinks, ctx),
                     sink = sinkProvider(ctx),
                     metrics = metrics)
 
index bd92c6d..723ba39 100644 (file)
@@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 
 class Router(private val routing: Routing, private val ctx: ClientContext) {
+
+    constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
+            routing {
+                kafkaSinks.forEach {
+                    defineRoute {
+                        fromDomain(it.name())
+                        toTopic(it.topicName())
+                        withFixedPartitioning()
+                    }
+                }
+            }.build(),
+            ctx
+    )
+
     fun findDestination(message: VesMessage): Option<RoutedMessage> =
             routing.routeFor(message.header).map { it(message) }.also {
                 if (it.isEmpty()) {
index f96350a..5b0dca2 100644 (file)
@@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                                          private val firstRequestDelay: Duration,
                                          private val requestInterval: Duration,
                                          private val healthState: HealthState,
+                                         private val streamParser: StreamFromGsonParser<KafkaSink>,
                                          retrySpec: Retry<Any>
 
 ) : ConfigurationProvider {
@@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             params.firstRequestDelay,
             params.requestInterval,
             HealthState.INSTANCE,
+            StreamFromGsonParsers.kafkaSinkParser(),
             Retry.any<Any>()
                     .retryMax(MAX_RETRIES)
                     .fixedBackoff(params.requestInterval)
@@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
     }
 
-    override fun invoke(): Flux<Routing> =
+    override fun invoke(): Flux<Sequence<KafkaSink>> =
             cbsClientMono
                     .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
                     .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
                     .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
                     .flatMapMany(::handleUpdates)
 
-    private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
+    private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
             .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
                     firstRequestDelay,
                     requestInterval)
@@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             .retryWhen(retry)
 
 
-    private fun createCollectorConfiguration(configuration: JsonObject): Routing =
-            try {
-                val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
-                routing {
-                    for (route in routingArray) {
-                        val routeObj = route.asJsonObject
-                        defineRoute {
-                            fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
-                            toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
-                            withFixedPartitioning()
-                        }
-                    }
-                }.build()
-            } catch (e: NullPointerException) {
-                throw ParsingException("Failed to parse configuration", e)
-            }
-
-    private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+    private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
+        try {
+            val sinks = DataStreams.namedSinks(configuration)
+                    .filter { it.type() == "kafka" }
+            return sinks.map(streamParser::unsafeParse).asSequence()
 
+        } catch (e: NullPointerException) {
+            throw ParsingException("Failed to parse configuration", e)
+        }
+    }
 
     companion object {
-        private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
-        private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
-        private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
         private const val MAX_RETRIES = 5L
         private val logger = Logger(ConfigurationProviderImpl::class)
     }
index f830f2c..e71250c 100644 (file)
@@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
 import reactor.core.publisher.Flux
-
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
@@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-
-                                val route1 = it.routes[0]
-                                assertThat(FAULT.domainName)
-                                        .describedAs("routed domain 1")
-                                        .isEqualTo(route1.domain)
-                                assertThat("test-topic-1")
-                                        .describedAs("target topic 1")
-                                        .isEqualTo(route1.targetTopic)
-
-                                val route2 = it.routes[1]
-                                assertThat(HEARTBEAT.domainName)
-                                        .describedAs("routed domain 2")
-                                        .isEqualTo(route2.domain)
-                                assertThat("test-topic-2")
-                                        .describedAs("target topic 2")
-                                        .isEqualTo(route2.targetTopic)
-
+                                val receivedSink1 = it.elementAt(0)
+                                val receivedSink2 = it.elementAt(1)
+
+                                assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+                                assertThat(receivedSink1.bootstrapServers())
+                                        .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+                                assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+                                assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+                                assertThat(receivedSink2.bootstrapServers())
+                                        .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+                                assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
                             }.verifyComplete()
                 }
             }
@@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({
 
 })
 
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+        .username("client")
+        .password("very secure password")
+        .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+        .username("other_client")
+        .password("another very secure password")
+        .build()
 
 private val validConfiguration = JsonParser().parse("""
 {
-    "whatever": "garbage",
-    "collector.routing": [
-            {
-                "fromDomain": "fault",
-                "toTopic": "test-topic-1"
+    "streams_publishes": {
+        "perf3gpp_regional": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "client",
+                "password": "very secure password"
+            },
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+                "topic_name": "REG_HVVES_PERF3GPP"
+            }
+        },
+        "perf3gpp_central": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "other_client",
+                "password": "another very secure password"
             },
-            {
-                "fromDomain": "heartbeat",
-                "toTopic": "test-topic-2"
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+                "topic_name": "CEN_HVVES_PERF3GPP"
             }
-    ]
+        }
+    }
 }""").asJsonObject
 
 private val invalidConfiguration = JsonParser().parse("""
 {
-    "whatever": "garbage",
-    "collector.routing": [
-            {
-                "fromDomain": "garbage",
-                "meaningful": "garbage"
+    "streams_publishes": {
+        "perf3gpp_regional": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "client",
+                "password": "very secure password"
+            },
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+                "popic_name": "REG_HVVES_PERF3GPP"
             }
-    ]
+        }
+    }
 }""").asJsonObject
 
 private val firstRequestDelay = Duration.ofMillis(1)
 private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
 
 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
                                            healthState: HealthState,
@@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
             firstRequestDelay,
             requestInterval,
             healthState,
+            streamParser,
             retry
     )
 }
index bd056d4..a6b32ed 100644 (file)
@@ -35,8 +35,8 @@ import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@ object MetricsSpecification : Spek({
 
     describe("Messages sent metrics") {
         it("should gather info for each topic separately") {
-            val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(PERF3GPP),
@@ -130,7 +130,7 @@ object MetricsSpecification : Spek({
 
     describe("Messages dropped metrics") {
         it("should gather metrics for invalid messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
 
             sut.handleConnection(
                     messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather metrics for route not found") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather metrics for sing errors") {
-            val sut = vesHvWithAlwaysFailingSink(basicRouting)
+            val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
 
             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
 
@@ -171,7 +171,7 @@ object MetricsSpecification : Spek({
         }
 
         it("should gather summed metrics for dropped messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
index ece4228..50fe098 100644 (file)
@@ -34,7 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicRouting)
+            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
 
             val numMessages: Long = 300_000
             val runs = 4
@@ -88,7 +88,7 @@ object PerformanceSpecification : Spek({
         it("should disconnect on transmission errors") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(basicRouting)
+            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
 
             val numMessages: Long = 100_000
             val timeout = Duration.ofSeconds(30)
index e84e948..da9290d 100644 (file)
@@ -38,7 +38,9 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import reactor.core.publisher.Flux
 import java.time.Duration
 import java.util.concurrent.atomic.AtomicBoolean
@@ -101,17 +103,17 @@ fun Sut.handleConnection(vararg packets: ByteBuf) {
     collector.handleConnection(Flux.fromArray(packets)).block(timeout)
 }
 
-fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
         Sut(AlwaysSuccessfulSink()).apply {
-            configurationProvider.updateConfiguration(routing)
+            configurationProvider.updateConfiguration(kafkaSinks)
         }
 
-fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
         Sut(AlwaysFailingSink()).apply {
-            configurationProvider.updateConfiguration(routing)
+            configurationProvider.updateConfiguration(kafkaSinks)
         }
 
-fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
         Sut(DelayingSink(delay)).apply {
-            configurationProvider.updateConfiguration(routing)
+            configurationProvider.updateConfiguration(kafkaSinks)
         }
index 17f6ce3..21c5c18 100644 (file)
@@ -33,10 +33,10 @@ import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -152,7 +152,7 @@ object VesHvSpecification : Spek({
         it("should be able to direct 2 messages from different domains to one topic") {
             val (sut, sink) = vesHvWithStoringSink()
 
-            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
+            sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
 
             val messages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP),
@@ -205,7 +205,7 @@ object VesHvSpecification : Spek({
             it("should update collector") {
                 val firstCollector = sut.collector
 
-                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
                 val collectorAfterUpdate = sut.collector
 
                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,12 +213,12 @@ object VesHvSpecification : Spek({
 
             it("should start routing messages") {
 
-                sut.configurationProvider.updateConfiguration(emptyRouting)
+                sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
 
                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messages).isEmpty()
 
-                sut.configurationProvider.updateConfiguration(basicRouting)
+                sut.configurationProvider.updateConfiguration(configWithBasicRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(1)
@@ -242,7 +242,7 @@ object VesHvSpecification : Spek({
                         .isEqualTo(0)
 
 
-                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(2)
@@ -261,7 +261,7 @@ object VesHvSpecification : Spek({
 
                 Flux.range(0, messagesAmount).doOnNext {
                     if (it == messagesForEachTopic) {
-                        sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                        sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
                     }
                 }.doOnNext {
                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
                 val incomingMessages = Flux.range(0, messageStreamSize)
                         .doOnNext {
                             if (it == pivot) {
-                                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
                                 println("config changed")
                             }
                         }
@@ -320,7 +320,7 @@ object VesHvSpecification : Spek({
         given("failed configuration change") {
             val (sut, _) = vesHvWithStoringSink()
             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
-            sut.configurationProvider.updateConfiguration(basicRouting)
+            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
 
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +349,6 @@ object VesHvSpecification : Spek({
 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
     val sink = StoringSink()
     val sut = Sut(sink)
-    sut.configurationProvider.updateConfiguration(basicRouting)
+    sut.configurationProvider.updateConfiguration(configWithBasicRouting)
     return Pair(sut, sink)
 }
index 1ad2b0e..213eff2 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
 import reactor.retry.RetryExhaustedException
@@ -33,56 +34,54 @@ import reactor.retry.RetryExhaustedException
 const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-
-val basicRouting = routing {
-    defineRoute {
-        fromDomain(PERF3GPP.domainName)
-        toTopic(PERF3GPP_TOPIC)
-        withFixedPartitioning()
-    }
-}.build()
-
-
-val twoDomainsToOneTopicRouting = routing {
-    defineRoute {
-        fromDomain(PERF3GPP.domainName)
-        toTopic(PERF3GPP_TOPIC)
-        withFixedPartitioning()
-    }
-    defineRoute {
-        fromDomain(HEARTBEAT.domainName)
-        toTopic(PERF3GPP_TOPIC)
-        withFixedPartitioning()
-    }
-    defineRoute {
-        fromDomain(MEASUREMENT.domainName)
-        toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
-        withFixedPartitioning()
-    }
-}.build()
-
-
-val configurationWithDifferentRouting = routing {
-    defineRoute {
-        fromDomain(PERF3GPP.domainName)
-        toTopic(ALTERNATE_PERF3GPP_TOPIC)
-        withFixedPartitioning()
-    }
-}.build()
-
-
-val emptyRouting = routing { }.build()
+const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
+
+val configWithBasicRouting = sequenceOf(
+    ImmutableKafkaSink.builder()
+            .name(PERF3GPP.domainName)
+            .topicName(PERF3GPP_TOPIC)
+            .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+            .build()
+)
+
+val configWithTwoDomainsToOneTopicRouting = sequenceOf(
+        ImmutableKafkaSink.builder()
+                .name(PERF3GPP.domainName)
+                .topicName(PERF3GPP_TOPIC)
+                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+                .build(),
+        ImmutableKafkaSink.builder()
+                .name(HEARTBEAT.domainName)
+                .topicName(PERF3GPP_TOPIC)
+                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+                .build(),
+        ImmutableKafkaSink.builder()
+                .name(MEASUREMENT.domainName)
+                .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+                .build()
+)
+
+val configWithDifferentRouting = sequenceOf(
+                ImmutableKafkaSink.builder()
+                        .name(PERF3GPP.domainName)
+                        .topicName(ALTERNATE_PERF3GPP_TOPIC)
+                        .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
+                        .build()
+        )
+
+val configWithEmptyRouting = emptySequence<KafkaSink>()
 
 
 class FakeConfigurationProvider : ConfigurationProvider {
     private var shouldThrowException = false
-    private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
+    private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
 
-    fun updateConfiguration(routing: Routing) =
+    fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
             if (shouldThrowException) {
                 configStream.onError(RetryExhaustedException("I'm so tired"))
             } else {
-                configStream.onNext(routing)
+                configStream.onNext(kafkaSinkSequence)
             }