Enhancement of routing test 53/58653/1
authorkjaniak <kornel.janiak@nokia.com>
Thu, 5 Jul 2018 06:13:59 +0000 (08:13 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 09:48:39 +0000 (11:48 +0200)
Change-Id: I432c09a219928d1a89a0c619db0e8cc3104af0ec
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt

index 12e1c1e..3f355e3 100644 (file)
@@ -27,20 +27,18 @@ internal class MessageValidator {
     val requiredFieldDescriptors = listOf(
             "version",
             "eventName",
-            "domain",
+            // "domain", TODO to be restored back when GPB schema will include default value
             "eventId",
             "sourceName",
             "reportingEntityName",
-            "priority",
+            // "priority", TODO to be restored back when GPB schema will include default value
             "startEpochMicrosec",
             "lastEpochMicrosec",
             "sequence")
     .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
 
     fun isValid(message: VesMessage): Boolean {
-        val header = message.header
-        val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
-        return ret
+        return allMandatoryFieldsArePresent(message.header)
     }
 
     private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
index 017187a..4d1b879 100644 (file)
@@ -67,17 +67,16 @@ internal object MessageValidatorTest : Spek({
                 assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
             }
 
-            it("should reject message with domain other than HVRANMEAS") {
-                Domain.values()
-                        .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED }
-                        .forEach { domain ->
+            Domain.values()
+                    .filter { it != Domain.UNRECOGNIZED }
+                    .forEach {domain ->
+                        it("should accept message with $domain domain"){
                             val header = newBuilder(commonHeader).setDomain(domain).build()
                             val vesMessage = VesMessage(header, vesMessageBytes(header))
                             assertThat(cut.isValid(vesMessage))
-                                    .describedAs("message with $domain domain")
-                                    .isFalse()
+                                    .isTrue()
                         }
-            }
+                    }
         }
 
         on("ves hv message bytes") {
index 44b3266..d78463b 100644 (file)
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import java.time.Duration
 
index d917c71..aa5810d 100644 (file)
@@ -23,9 +23,10 @@ import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.Routing
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.fakes.*
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 
 /**
@@ -137,6 +138,29 @@ object VesHvSpecification : Spek({
             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
         }
 
+        it("should be able to direct 2 messages from different domains to one topic") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+            val messages = sut.handleConnection(sink,
+                    vesMessage(Domain.HVRANMEAS),
+                    vesMessage(Domain.HEARTBEAT),
+                    vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+
+            assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+            assertThat(messages.get(0).topic).describedAs("first message topic")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+
+            assertThat(messages.get(1).topic).describedAs("second message topic")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+
+            assertThat(messages.get(2).topic).describedAs("last message topic")
+                    .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+        }
+        
         it("should drop message if route was not found") {
             val sink = StoringSink()
             val sut = Sut(sink)
@@ -169,4 +193,5 @@ object VesHvSpecification : Spek({
             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
         }
     }
+
 })
index 82226dc..47c31c6 100644 (file)
@@ -28,6 +28,7 @@ import reactor.core.publisher.UnicastProcessor
 
 
 const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
 
 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
         kafkaBootstrapServers = "localhost:9969",
@@ -40,6 +41,27 @@ val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
         }.build()
 )
 
+val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
+        kafkaBootstrapServers = "localhost:9969",
+        routing = routing {
+            defineRoute {
+                fromDomain(Domain.HVRANMEAS)
+                toTopic(HVRANMEAS_TOPIC)
+                withFixedPartitioning()
+            }
+            defineRoute {
+                fromDomain(Domain.HEARTBEAT)
+                toTopic(HVRANMEAS_TOPIC)
+                withFixedPartitioning()
+            }
+            defineRoute {
+                fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+                toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+                withFixedPartitioning()
+            }
+        }.build()
+)
+
 
 class FakeConfigurationProvider : ConfigurationProvider {
     private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()