2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.tests.component
 
  22 import org.assertj.core.api.Assertions.assertThat
 
  23 import org.jetbrains.spek.api.Spek
 
  24 import org.jetbrains.spek.api.dsl.describe
 
  25 import org.jetbrains.spek.api.dsl.given
 
  26 import org.jetbrains.spek.api.dsl.it
 
  27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
 
  28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 
  29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 
  30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 
  31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 
  32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
 
  33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 
  34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 
  35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 
  36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 
  37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
 
  38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
 
  39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
 
  40 import org.onap.dcae.collectors.veshv.tests.utils.*
 
  42 import reactor.core.publisher.Flux
 
  43 import java.time.Duration
 
  46  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  49 object VesHvSpecification : Spek({
 
  52     describe("VES High Volume Collector") {
 
  53         it("should handle multiple HV RAN events") {
 
  54             val (sut, sink) = vesHvWithStoringSink()
 
  55             val messages = sut.handleConnection(sink,
 
  56                     vesWireFrameMessage(PERF3GPP),
 
  57                     vesWireFrameMessage(PERF3GPP)
 
  61                     .describedAs("should send all events")
 
  65         it("should close sink when closing collector provider") {
 
  66             val (sut, _) = vesHvWithStoringSink()
 
  70             assertThat(sut.sinkProvider.closed).isTrue()
 
  74     describe("Memory management") {
 
  75         it("should release memory for each handled and dropped message") {
 
  76             val (sut, sink) = vesHvWithStoringSink()
 
  77             val validMessage = vesWireFrameMessage(PERF3GPP)
 
  78             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
 
  79             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
 
  80             val expectedRefCnt = 0
 
  82             val handledEvents = sut.handleConnection(
 
  83                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
 
  85             assertThat(handledEvents).hasSize(1)
 
  87             assertThat(validMessage.refCnt())
 
  88                     .describedAs("handled message should be released")
 
  89                     .isEqualTo(expectedRefCnt)
 
  90             assertThat(msgWithInvalidFrame.refCnt())
 
  91                     .describedAs("message with invalid frame should be released")
 
  92                     .isEqualTo(expectedRefCnt)
 
  93             assertThat(msgWithTooBigPayload.refCnt())
 
  94                     .describedAs("message with payload exceeding 1MiB should be released")
 
  95                     .isEqualTo(expectedRefCnt)
 
  98         it("should release memory for each message with invalid payload") {
 
  99             val (sut, sink) = vesHvWithStoringSink()
 
 100             val validMessage = vesWireFrameMessage(PERF3GPP)
 
 101             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
 
 102             val expectedRefCnt = 0
 
 104             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
 
 106             assertThat(handledEvents).hasSize(1)
 
 108             assertThat(validMessage.refCnt())
 
 109                     .describedAs("handled message should be released")
 
 110                     .isEqualTo(expectedRefCnt)
 
 111             assertThat(msgWithInvalidPayload.refCnt())
 
 112                     .describedAs("message with invalid payload should be released")
 
 113                     .isEqualTo(expectedRefCnt)
 
 117         it("should release memory for each message with garbage frame") {
 
 118             val (sut, sink) = vesHvWithStoringSink()
 
 119             val validMessage = vesWireFrameMessage(PERF3GPP)
 
 120             val msgWithGarbageFrame = garbageFrame()
 
 121             val expectedRefCnt = 0
 
 123             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
 
 125             assertThat(handledEvents).hasSize(1)
 
 127             assertThat(validMessage.refCnt())
 
 128                     .describedAs("handled message should be released")
 
 129                     .isEqualTo(expectedRefCnt)
 
 130             assertThat(msgWithGarbageFrame.refCnt())
 
 131                     .describedAs("message with garbage frame should be released")
 
 132                     .isEqualTo(expectedRefCnt)
 
 137     describe("message routing") {
 
 138         it("should direct message to a topic by means of routing configuration") {
 
 139             val (sut, sink) = vesHvWithStoringSink()
 
 141             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 142             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 144             val msg = messages[0]
 
 145             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
 
 146             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
 
 149         it("should be able to direct 2 messages from different domains to one topic") {
 
 150             val (sut, sink) = vesHvWithStoringSink()
 
 152             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
 
 154             val messages = sut.handleConnection(sink,
 
 155                     vesWireFrameMessage(PERF3GPP),
 
 156                     vesWireFrameMessage(HEARTBEAT),
 
 157                     vesWireFrameMessage(MEASUREMENT))
 
 159             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
 161             assertThat(messages[0].topic).describedAs("first message topic")
 
 162                     .isEqualTo(PERF3GPP_TOPIC)
 
 164             assertThat(messages[1].topic).describedAs("second message topic")
 
 165                     .isEqualTo(PERF3GPP_TOPIC)
 
 167             assertThat(messages[2].topic).describedAs("last message topic")
 
 168                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
 
 171         it("should drop message if route was not found") {
 
 172             val (sut, sink) = vesHvWithStoringSink()
 
 173             val messages = sut.handleConnection(sink,
 
 174                     vesWireFrameMessage(OTHER, "first"),
 
 175                     vesWireFrameMessage(PERF3GPP, "second"),
 
 176                     vesWireFrameMessage(HEARTBEAT, "third"))
 
 178             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 180             val msg = messages[0]
 
 181             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
 
 182             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
 
 186     describe("configuration update") {
 
 188         val defaultTimeout = Duration.ofSeconds(10)
 
 190         given("successful configuration change") {
 
 192             lateinit var sut: Sut
 
 193             lateinit var sink: StoringSink
 
 196                 vesHvWithStoringSink().run {
 
 202             it("should update collector") {
 
 203                 val firstCollector = sut.collector
 
 205                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 206                 val collectorAfterUpdate = sut.collector
 
 208                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
 
 211             it("should start routing messages") {
 
 213                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
 
 215                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 216                 assertThat(messages).isEmpty()
 
 218                 sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 220                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 221                 assertThat(messagesAfterUpdate).hasSize(1)
 
 222                 val message = messagesAfterUpdate[0]
 
 224                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
 
 225                         .isEqualTo(PERF3GPP_TOPIC)
 
 226                 assertThat(message.partition).describedAs("routed message partition")
 
 230             it("should change domain routing") {
 
 232                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 233                 assertThat(messages).hasSize(1)
 
 234                 val firstMessage = messages[0]
 
 236                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
 
 237                         .isEqualTo(PERF3GPP_TOPIC)
 
 238                 assertThat(firstMessage.partition).describedAs("routed message partition")
 
 242                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 244                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 245                 assertThat(messagesAfterUpdate).hasSize(2)
 
 246                 val secondMessage = messagesAfterUpdate[1]
 
 248                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
 
 249                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
 
 250                 assertThat(secondMessage.partition).describedAs("routed message partition")
 
 254             it("should update routing for each client sending one message") {
 
 256                 val messagesAmount = 10
 
 257                 val messagesForEachTopic = 5
 
 259                 Flux.range(0, messagesAmount).doOnNext {
 
 260                     if (it == messagesForEachTopic) {
 
 261                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 264                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
 
 265                 }.then().block(defaultTimeout)
 
 268                 val messages = sink.sentMessages
 
 269                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
 
 270                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
 
 272                 assertThat(messages.size).isEqualTo(messagesAmount)
 
 273                 assertThat(messagesForEachTopic)
 
 274                         .describedAs("amount of messages routed to each topic")
 
 275                         .isEqualTo(firstTopicMessagesCount)
 
 276                         .isEqualTo(secondTopicMessagesCount)
 
 279             it("should not update routing for client sending continuous stream of messages") {
 
 281                 val messageStreamSize = 10
 
 284                 val incomingMessages = Flux.range(0, messageStreamSize)
 
 287                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 288                                 println("config changed")
 
 291                         .map { vesWireFrameMessage(PERF3GPP) }
 
 294                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
 
 296                 val messages = sink.sentMessages
 
 297                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
 
 298                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
 
 300                 assertThat(messages.size).isEqualTo(messageStreamSize)
 
 301                 assertThat(firstTopicMessagesCount)
 
 302                         .describedAs("amount of messages routed to first topic")
 
 303                         .isEqualTo(messageStreamSize)
 
 305                 assertThat(secondTopicMessagesCount)
 
 306                         .describedAs("amount of messages routed to second topic")
 
 310             it("should mark the application healthy") {
 
 311                 assertThat(sut.healthStateProvider.currentHealth)
 
 312                         .describedAs("application health state")
 
 313                         .isEqualTo(HealthDescription.HEALTHY)
 
 317         given("failed configuration change") {
 
 318             val (sut, _) = vesHvWithStoringSink()
 
 319             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
 
 320             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 322             it("should mark the application unhealthy ") {
 
 323                 assertThat(sut.healthStateProvider.currentHealth)
 
 324                         .describedAs("application health state")
 
 325                         .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
 
 330     describe("request validation") {
 
 331         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
 
 332             val (sut, sink) = vesHvWithStoringSink()
 
 334             val handledMessages = sut.handleConnection(sink,
 
 335                     vesWireFrameMessage(PERF3GPP, "first"),
 
 336                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
 
 337                     vesWireFrameMessage(PERF3GPP))
 
 339             assertThat(handledMessages).hasSize(1)
 
 340             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
 
 346 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
 
 347     val sink = StoringSink()
 
 349     sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 350     return Pair(sut, sink)