Redefine Routing
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / VesHvSpecification.kt
index 21c5c18..5d215fc 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
+import arrow.core.None
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -30,13 +31,12 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -65,12 +65,28 @@ object VesHvSpecification : Spek({
                     .hasSize(2)
         }
 
+        it("should create sink lazily") {
+            val (sut, sink) = vesHvWithStoringSink()
+
+            // just connecting should not create sink
+            sut.handleConnection()
+            sut.close().unsafeRunSync()
+
+            // then
+            assertThat(sink.closed).isFalse()
+        }
+
         it("should close sink when closing collector provider") {
-            val (sut, _) = vesHvWithStoringSink()
+            val (sut, sink) = vesHvWithStoringSink()
+            // given Sink initialized
+            // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+            sut.handleConnection(vesWireFrameMessage(PERF3GPP))
 
-            sut.close()
+            // when
+            sut.close().unsafeRunSync()
 
-            assertThat(sut.sinkProvider.closed).isTrue()
+            // then
+            assertThat(sink.closed).isTrue()
         }
     }
 
@@ -145,14 +161,14 @@ object VesHvSpecification : Spek({
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
-            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+            assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
         }
 
         it("should be able to direct 2 messages from different domains to one topic") {
             val (sut, sink) = vesHvWithStoringSink()
 
-            sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
 
             val messages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP),
@@ -161,14 +177,14 @@ object VesHvSpecification : Spek({
 
             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
-            assertThat(messages[0].topic).describedAs("first message topic")
+            assertThat(messages[0].targetTopic).describedAs("first message topic")
                     .isEqualTo(PERF3GPP_TOPIC)
 
-            assertThat(messages[1].topic).describedAs("second message topic")
+            assertThat(messages[1].targetTopic).describedAs("second message topic")
                     .isEqualTo(PERF3GPP_TOPIC)
 
-            assertThat(messages[2].topic).describedAs("last message topic")
-                    .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+            assertThat(messages[2].targetTopic).describedAs("last message topic")
+                    .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
         }
 
         it("should drop message if route was not found") {
@@ -181,7 +197,7 @@ object VesHvSpecification : Spek({
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+            assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
         }
     }
@@ -205,7 +221,7 @@ object VesHvSpecification : Spek({
             it("should update collector") {
                 val firstCollector = sut.collector
 
-                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(alternativeRouting)
                 val collectorAfterUpdate = sut.collector
 
                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,21 +229,21 @@ object VesHvSpecification : Spek({
 
             it("should start routing messages") {
 
-                sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+                sut.configurationProvider.updateConfiguration(emptyRouting)
 
                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messages).isEmpty()
 
-                sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+                sut.configurationProvider.updateConfiguration(basicRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(1)
                 val message = messagesAfterUpdate[0]
 
-                assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
                         .isEqualTo(PERF3GPP_TOPIC)
                 assertThat(message.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
             }
 
             it("should change domain routing") {
@@ -236,22 +252,22 @@ object VesHvSpecification : Spek({
                 assertThat(messages).hasSize(1)
                 val firstMessage = messages[0]
 
-                assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
                         .isEqualTo(PERF3GPP_TOPIC)
                 assertThat(firstMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
 
 
-                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(alternativeRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(2)
                 val secondMessage = messagesAfterUpdate[1]
 
-                assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
                 assertThat(secondMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
             }
 
             it("should update routing for each client sending one message") {
@@ -261,7 +277,7 @@ object VesHvSpecification : Spek({
 
                 Flux.range(0, messagesAmount).doOnNext {
                     if (it == messagesForEachTopic) {
-                        sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                        sut.configurationProvider.updateConfiguration(alternativeRouting)
                     }
                 }.doOnNext {
                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -269,8 +285,8 @@ object VesHvSpecification : Spek({
 
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messagesAmount)
                 assertThat(messagesForEachTopic)
@@ -287,7 +303,7 @@ object VesHvSpecification : Spek({
                 val incomingMessages = Flux.range(0, messageStreamSize)
                         .doOnNext {
                             if (it == pivot) {
-                                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                                sut.configurationProvider.updateConfiguration(alternativeRouting)
                                 println("config changed")
                             }
                         }
@@ -297,8 +313,8 @@ object VesHvSpecification : Spek({
                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messageStreamSize)
                 assertThat(firstTopicMessagesCount)
@@ -320,7 +336,7 @@ object VesHvSpecification : Spek({
         given("failed configuration change") {
             val (sut, _) = vesHvWithStoringSink()
             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
-            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +365,6 @@ object VesHvSpecification : Spek({
 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
     val sink = StoringSink()
     val sut = Sut(sink)
-    sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+    sut.configurationProvider.updateConfiguration(basicRouting)
     return Pair(sut, sink)
 }