Use DataStream API from CBS client
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / ConfigurationProviderTest.kt
index f830f2c..e71250c 100644 (file)
@@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
 import reactor.core.publisher.Flux
-
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
@@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-
-                                val route1 = it.routes[0]
-                                assertThat(FAULT.domainName)
-                                        .describedAs("routed domain 1")
-                                        .isEqualTo(route1.domain)
-                                assertThat("test-topic-1")
-                                        .describedAs("target topic 1")
-                                        .isEqualTo(route1.targetTopic)
-
-                                val route2 = it.routes[1]
-                                assertThat(HEARTBEAT.domainName)
-                                        .describedAs("routed domain 2")
-                                        .isEqualTo(route2.domain)
-                                assertThat("test-topic-2")
-                                        .describedAs("target topic 2")
-                                        .isEqualTo(route2.targetTopic)
-
+                                val receivedSink1 = it.elementAt(0)
+                                val receivedSink2 = it.elementAt(1)
+
+                                assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+                                assertThat(receivedSink1.bootstrapServers())
+                                        .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+                                assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+                                assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+                                assertThat(receivedSink2.bootstrapServers())
+                                        .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+                                assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
                             }.verifyComplete()
                 }
             }
@@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({
 
 })
 
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+        .username("client")
+        .password("very secure password")
+        .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+        .username("other_client")
+        .password("another very secure password")
+        .build()
 
 private val validConfiguration = JsonParser().parse("""
 {
-    "whatever": "garbage",
-    "collector.routing": [
-            {
-                "fromDomain": "fault",
-                "toTopic": "test-topic-1"
+    "streams_publishes": {
+        "perf3gpp_regional": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "client",
+                "password": "very secure password"
+            },
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+                "topic_name": "REG_HVVES_PERF3GPP"
+            }
+        },
+        "perf3gpp_central": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "other_client",
+                "password": "another very secure password"
             },
-            {
-                "fromDomain": "heartbeat",
-                "toTopic": "test-topic-2"
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+                "topic_name": "CEN_HVVES_PERF3GPP"
             }
-    ]
+        }
+    }
 }""").asJsonObject
 
 private val invalidConfiguration = JsonParser().parse("""
 {
-    "whatever": "garbage",
-    "collector.routing": [
-            {
-                "fromDomain": "garbage",
-                "meaningful": "garbage"
+    "streams_publishes": {
+        "perf3gpp_regional": {
+            "type": "kafka",
+            "aaf_credentials": {
+                "username": "client",
+                "password": "very secure password"
+            },
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+                "popic_name": "REG_HVVES_PERF3GPP"
             }
-    ]
+        }
+    }
 }""").asJsonObject
 
 private val firstRequestDelay = Duration.ofMillis(1)
 private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
 
 private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
                                            healthState: HealthState,
@@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
             firstRequestDelay,
             requestInterval,
             healthState,
+            streamParser,
             retry
     )
 }