HV-VES Domain update
[dcaegen2/collectors/hv-ves.git] / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / VesHvSpecification.kt
index 26032ff..e9c0d67 100644 (file)
@@ -22,74 +22,326 @@ package org.onap.dcae.collectors.veshv.tests.component
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+
+import reactor.core.publisher.Flux
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 object VesHvSpecification : Spek({
+    debugRx(false)
+
     describe("VES High Volume Collector") {
-        system("should handle multiple HV RAN events") { sut ->
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+        it("should handle multiple HV RAN events") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val messages = sut.handleConnection(sink,
+                    vesWireFrameMessage(PERF3GPP),
+                    vesWireFrameMessage(PERF3GPP)
+            )
 
             assertThat(messages)
                     .describedAs("should send all events")
                     .hasSize(2)
         }
+    }
 
-        system("should release memory for each incoming message") { sut ->
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val msgWithInvalidDomain = vesMessage(Domain.OTHER)
-            val msgWithInvalidPayload = invalidVesMessage()
+    describe("Memory management") {
+        it("should release memory for each handled and dropped message") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val validMessage = vesWireFrameMessage(PERF3GPP)
             val msgWithInvalidFrame = invalidWireFrame()
-            val validMessage = vesMessage(Domain.HVRANMEAS)
-            val refCntBeforeSending = msgWithInvalidDomain.refCnt()
+            val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+            val expectedRefCnt = 0
 
-            sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+            val handledEvents = sut.handleConnection(
+                    sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
 
-            assertThat(msgWithInvalidDomain.refCnt())
-                    .describedAs("message with invalid domain should be released")
-                    .isEqualTo(refCntBeforeSending)
-            assertThat(msgWithInvalidPayload.refCnt())
-                    .describedAs("message with invalid payload should be released")
-                    .isEqualTo(refCntBeforeSending)
+            assertThat(handledEvents).hasSize(1)
+
+            assertThat(validMessage.refCnt())
+                    .describedAs("handled message should be released")
+                    .isEqualTo(expectedRefCnt)
             assertThat(msgWithInvalidFrame.refCnt())
                     .describedAs("message with invalid frame should be released")
-                    .isEqualTo(refCntBeforeSending)
+                    .isEqualTo(expectedRefCnt)
+            assertThat(msgWithTooBigPayload.refCnt())
+                    .describedAs("message with payload exceeding 1MiB should be released")
+                    .isEqualTo(expectedRefCnt)
+        }
+
+        it("should release memory for each message with invalid payload") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val validMessage = vesWireFrameMessage(PERF3GPP)
+            val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
+            val expectedRefCnt = 0
+
+            val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
+
+            assertThat(handledEvents).hasSize(1)
+
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
-                    .isEqualTo(refCntBeforeSending)
+                    .isEqualTo(expectedRefCnt)
+            assertThat(msgWithInvalidPayload.refCnt())
+                    .describedAs("message with invalid payload should be released")
+                    .isEqualTo(expectedRefCnt)
+
+        }
+
+        it("should release memory for each message with garbage frame") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val validMessage = vesWireFrameMessage(PERF3GPP)
+            val msgWithGarbageFrame = garbageFrame()
+            val expectedRefCnt = 0
+
+            val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
+
+            assertThat(handledEvents).hasSize(1)
+
+            assertThat(validMessage.refCnt())
+                    .describedAs("handled message should be released")
+                    .isEqualTo(expectedRefCnt)
+            assertThat(msgWithGarbageFrame.refCnt())
+                    .describedAs("message with garbage frame should be released")
+                    .isEqualTo(expectedRefCnt)
+
         }
     }
 
     describe("message routing") {
-        system("should direct message to a topic by means of routing configuration") { sut ->
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+        it("should direct message to a topic by means of routing configuration") {
+            val (sut, sink) = vesHvWithStoringSink()
 
-            val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS))
+            val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
-            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1)
+            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
         }
 
-        system("should drop message if route was not found") { sut ->
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val messages = sut.handleConnection(
-                    vesMessage(Domain.OTHER, "first"),
-                    vesMessage(Domain.HVRANMEAS, "second"),
-                    vesMessage(Domain.HEARTBEAT, "third"))
+        it("should be able to direct 2 messages from different domains to one topic") {
+            val (sut, sink) = vesHvWithStoringSink()
+
+            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+            val messages = sut.handleConnection(sink,
+                    vesWireFrameMessage(PERF3GPP),
+                    vesWireFrameMessage(HEARTBEAT),
+                    vesWireFrameMessage(MEASUREMENT))
+
+            assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+            assertThat(messages[0].topic).describedAs("first message topic")
+                    .isEqualTo(PERF3GPP_TOPIC)
+
+            assertThat(messages[1].topic).describedAs("second message topic")
+                    .isEqualTo(PERF3GPP_TOPIC)
+
+            assertThat(messages[2].topic).describedAs("last message topic")
+                    .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+        }
+
+        it("should drop message if route was not found") {
+            val (sut, sink) = vesHvWithStoringSink()
+            val messages = sut.handleConnection(sink,
+                    vesWireFrameMessage(OTHER, "first"),
+                    vesWireFrameMessage(PERF3GPP, "second"),
+                    vesWireFrameMessage(HEARTBEAT, "third"))
 
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
         }
     }
+
+    describe("configuration update") {
+
+        val defaultTimeout = Duration.ofSeconds(10)
+
+        given("successful configuration change") {
+
+            lateinit var sut: Sut
+            lateinit var sink: StoringSink
+
+            beforeEachTest {
+                vesHvWithStoringSink().run {
+                    sut = first
+                    sink = second
+                }
+            }
+
+            it("should update collector") {
+                val firstCollector = sut.collector
+
+                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                val collectorAfterUpdate = sut.collector
+
+                assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+            }
+
+            it("should start routing messages") {
+
+                sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+                assertThat(messages).isEmpty()
+
+                sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+                assertThat(messagesAfterUpdate).hasSize(1)
+                val message = messagesAfterUpdate[0]
+
+                assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                        .isEqualTo(PERF3GPP_TOPIC)
+                assertThat(message.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
+            }
+
+            it("should change domain routing") {
+
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+                assertThat(messages).hasSize(1)
+                val firstMessage = messages[0]
+
+                assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                        .isEqualTo(PERF3GPP_TOPIC)
+                assertThat(firstMessage.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
+
+
+                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+                assertThat(messagesAfterUpdate).hasSize(2)
+                val secondMessage = messagesAfterUpdate[1]
+
+                assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                        .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
+                assertThat(secondMessage.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
+            }
+
+            it("should update routing for each client sending one message") {
+
+                val messagesAmount = 10
+                val messagesForEachTopic = 5
+
+                Flux.range(0, messagesAmount).doOnNext {
+                    if (it == messagesForEachTopic) {
+                        sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                    }
+                }.doOnNext {
+                    sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+                }.then().block(defaultTimeout)
+
+
+                val messages = sink.sentMessages
+                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+                assertThat(messages.size).isEqualTo(messagesAmount)
+                assertThat(messagesForEachTopic)
+                        .describedAs("amount of messages routed to each topic")
+                        .isEqualTo(firstTopicMessagesCount)
+                        .isEqualTo(secondTopicMessagesCount)
+            }
+
+            it("should not update routing for client sending continuous stream of messages") {
+
+                val messageStreamSize = 10
+                val pivot = 5
+
+                val incomingMessages = Flux.range(0, messageStreamSize)
+                        .doOnNext {
+                            if (it == pivot) {
+                                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                                println("config changed")
+                            }
+                        }
+                        .map { vesWireFrameMessage(PERF3GPP) }
+
+
+                sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+                val messages = sink.sentMessages
+                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+                assertThat(messages.size).isEqualTo(messageStreamSize)
+                assertThat(firstTopicMessagesCount)
+                        .describedAs("amount of messages routed to first topic")
+                        .isEqualTo(messageStreamSize)
+
+                assertThat(secondTopicMessagesCount)
+                        .describedAs("amount of messages routed to second topic")
+                        .isEqualTo(0)
+            }
+
+            it("should mark the application healthy") {
+                assertThat(sut.healthStateProvider.currentHealth)
+                        .describedAs("application health state")
+                        .isEqualTo(HealthDescription.HEALTHY)
+            }
+        }
+
+        given("failed configuration change") {
+            val (sut, _) = vesHvWithStoringSink()
+            sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            it("should mark the application unhealthy ") {
+                assertThat(sut.healthStateProvider.currentHealth)
+                        .describedAs("application health state")
+                        .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+            }
+        }
+    }
+
+    describe("request validation") {
+        it("should reject message with payload greater than 1 MiB and all subsequent messages") {
+            val (sut, sink) = vesHvWithStoringSink()
+
+            val handledMessages = sut.handleConnection(sink,
+                    vesWireFrameMessage(PERF3GPP, "first"),
+                    vesMessageWithTooBigPayload(PERF3GPP),
+                    vesWireFrameMessage(PERF3GPP))
+
+            assertThat(handledMessages).hasSize(1)
+            assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
+        }
+    }
+
 })
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+    val sink = StoringSink()
+    val sut = Sut(sink)
+    sut.configurationProvider.updateConfiguration(basicConfiguration)
+    return Pair(sut, sink)
+}