import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
+import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
+import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+
+import reactor.core.publisher.Flux
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
object VesHvSpecification : Spek({
+ debugRx(false)
+
describe("VES High Volume Collector") {
- system("should handle multiple HV RAN events") { sut ->
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+ it("should handle multiple HV RAN events") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP)
+ )
assertThat(messages)
.describedAs("should send all events")
.hasSize(2)
}
+ }
- system("should release memory for each incoming message") { sut ->
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val msgWithInvalidDomain = vesMessage(Domain.OTHER)
- val msgWithInvalidPayload = invalidVesMessage()
+ describe("Memory management") {
+ it("should release memory for each handled and dropped message") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = invalidWireFrame()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val refCntBeforeSending = msgWithInvalidDomain.refCnt()
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+ val expectedRefCnt = 0
- sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+ val handledEvents = sut.handleConnection(
+ sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
- assertThat(msgWithInvalidDomain.refCnt())
- .describedAs("message with invalid domain should be released")
- .isEqualTo(refCntBeforeSending)
- assertThat(msgWithInvalidPayload.refCnt())
- .describedAs("message with invalid payload should be released")
- .isEqualTo(refCntBeforeSending)
+ assertThat(handledEvents).hasSize(1)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithTooBigPayload.refCnt())
+ .describedAs("message with payload exceeding 1MiB should be released")
+ .isEqualTo(expectedRefCnt)
+ }
+
+ it("should release memory for each message with invalid payload") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
+ val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
+
+ assertThat(handledEvents).hasSize(1)
+
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithInvalidPayload.refCnt())
+ .describedAs("message with invalid payload should be released")
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ it("should release memory for each message with garbage frame") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val validMessage = vesWireFrameMessage(PERF3GPP)
+ val msgWithGarbageFrame = garbageFrame()
+ val expectedRefCnt = 0
+
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
+
+ assertThat(handledEvents).hasSize(1)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithGarbageFrame.refCnt())
+ .describedAs("message with garbage frame should be released")
+ .isEqualTo(expectedRefCnt)
+
}
}
describe("message routing") {
- system("should direct message to a topic by means of routing configuration") { sut ->
- sut.configurationProvider.updateConfiguration(basicConfiguration)
+ it("should direct message to a topic by means of routing configuration") {
+ val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
- system("should drop message if route was not found") { sut ->
- sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(
- vesMessage(Domain.OTHER, "first"),
- vesMessage(Domain.HVRANMEAS, "second"),
- vesMessage(Domain.HEARTBEAT, "third"))
+ it("should be able to direct 2 messages from different domains to one topic") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
+
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP),
+ vesWireFrameMessage(HEARTBEAT),
+ vesWireFrameMessage(MEASUREMENT))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(3)
+
+ assertThat(messages[0].topic).describedAs("first message topic")
+ .isEqualTo(PERF3GPP_TOPIC)
+
+ assertThat(messages[1].topic).describedAs("second message topic")
+ .isEqualTo(PERF3GPP_TOPIC)
+
+ assertThat(messages[2].topic).describedAs("last message topic")
+ .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+ }
+
+ it("should drop message if route was not found") {
+ val (sut, sink) = vesHvWithStoringSink()
+ val messages = sut.handleConnection(sink,
+ vesWireFrameMessage(OTHER, "first"),
+ vesWireFrameMessage(PERF3GPP, "second"),
+ vesWireFrameMessage(HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
+
+ describe("configuration update") {
+
+ val defaultTimeout = Duration.ofSeconds(10)
+
+ given("successful configuration change") {
+
+ lateinit var sut: Sut
+ lateinit var sink: StoringSink
+
+ beforeEachTest {
+ vesHvWithStoringSink().run {
+ sut = first
+ sink = second
+ }
+ }
+
+ it("should update collector") {
+ val firstCollector = sut.collector
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ val collectorAfterUpdate = sut.collector
+
+ assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+ }
+
+ it("should start routing messages") {
+
+ sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messages).isEmpty()
+
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messagesAfterUpdate).hasSize(1)
+ val message = messagesAfterUpdate[0]
+
+ assertThat(message.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(PERF3GPP_TOPIC)
+ assertThat(message.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should change domain routing") {
+
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messages).hasSize(1)
+ val firstMessage = messages[0]
+
+ assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+ .isEqualTo(PERF3GPP_TOPIC)
+ assertThat(firstMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+
+
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ assertThat(messagesAfterUpdate).hasSize(2)
+ val secondMessage = messagesAfterUpdate[1]
+
+ assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+ .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
+ assertThat(secondMessage.partition).describedAs("routed message partition")
+ .isEqualTo(0)
+ }
+
+ it("should update routing for each client sending one message") {
+
+ val messagesAmount = 10
+ val messagesForEachTopic = 5
+
+ Flux.range(0, messagesAmount).doOnNext {
+ if (it == messagesForEachTopic) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ }
+ }.doOnNext {
+ sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
+ }.then().block(defaultTimeout)
+
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messagesAmount)
+ assertThat(messagesForEachTopic)
+ .describedAs("amount of messages routed to each topic")
+ .isEqualTo(firstTopicMessagesCount)
+ .isEqualTo(secondTopicMessagesCount)
+ }
+
+ it("should not update routing for client sending continuous stream of messages") {
+
+ val messageStreamSize = 10
+ val pivot = 5
+
+ val incomingMessages = Flux.range(0, messageStreamSize)
+ .doOnNext {
+ if (it == pivot) {
+ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+ println("config changed")
+ }
+ }
+ .map { vesWireFrameMessage(PERF3GPP) }
+
+
+ sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+ val messages = sink.sentMessages
+ val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+
+ assertThat(messages.size).isEqualTo(messageStreamSize)
+ assertThat(firstTopicMessagesCount)
+ .describedAs("amount of messages routed to first topic")
+ .isEqualTo(messageStreamSize)
+
+ assertThat(secondTopicMessagesCount)
+ .describedAs("amount of messages routed to second topic")
+ .isEqualTo(0)
+ }
+
+ it("should mark the application healthy") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthDescription.HEALTHY)
+ }
+ }
+
+ given("failed configuration change") {
+ val (sut, _) = vesHvWithStoringSink()
+ sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ it("should mark the application unhealthy ") {
+ assertThat(sut.healthStateProvider.currentHealth)
+ .describedAs("application health state")
+ .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ }
+ }
+ }
+
+ describe("request validation") {
+ it("should reject message with payload greater than 1 MiB and all subsequent messages") {
+ val (sut, sink) = vesHvWithStoringSink()
+
+ val handledMessages = sut.handleConnection(sink,
+ vesWireFrameMessage(PERF3GPP, "first"),
+ vesMessageWithTooBigPayload(PERF3GPP),
+ vesWireFrameMessage(PERF3GPP))
+
+ assertThat(handledMessages).hasSize(1)
+ assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
+ }
+ }
+
})
+
+private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
+ val sink = StoringSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ return Pair(sut, sink)
+}