2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.tests.component
 
  22 import org.assertj.core.api.Assertions.assertThat
 
  23 import org.jetbrains.spek.api.Spek
 
  24 import org.jetbrains.spek.api.dsl.describe
 
  25 import org.jetbrains.spek.api.dsl.given
 
  26 import org.jetbrains.spek.api.dsl.it
 
  27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
 
  28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
 
  29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 
  30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 
  31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 
  32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
 
  33 import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
 
  34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 
  35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 
  36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
 
  37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
 
  38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
 
  39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
 
  40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 
  41 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
 
  42 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
 
  43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 
  44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
 
  46 import reactor.core.publisher.Flux
 
  47 import java.time.Duration
 
  50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  53 object VesHvSpecification : Spek({
 
  56     describe("VES High Volume Collector") {
 
  57         it("should handle multiple HV RAN events") {
 
  58             val (sut, sink) = vesHvWithStoringSink()
 
  59             val messages = sut.handleConnection(sink,
 
  60                     vesWireFrameMessage(HVMEAS),
 
  61                     vesWireFrameMessage(HVMEAS)
 
  65                     .describedAs("should send all events")
 
  70     describe("Memory management") {
 
  71         it("should release memory for each handled and dropped message") {
 
  72             val (sut, sink) = vesHvWithStoringSink()
 
  73             val validMessage = vesWireFrameMessage(HVMEAS)
 
  74             val msgWithInvalidFrame = invalidWireFrame()
 
  75             val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
 
  76             val expectedRefCnt = 0
 
  78             val handledEvents = sut.handleConnection(
 
  79                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
 
  81             assertThat(handledEvents).hasSize(1)
 
  83             assertThat(validMessage.refCnt())
 
  84                     .describedAs("handled message should be released")
 
  85                     .isEqualTo(expectedRefCnt)
 
  86             assertThat(msgWithInvalidFrame.refCnt())
 
  87                     .describedAs("message with invalid frame should be released")
 
  88                     .isEqualTo(expectedRefCnt)
 
  89             assertThat(msgWithTooBigPayload.refCnt())
 
  90                     .describedAs("message with payload exceeding 1MiB should be released")
 
  91                     .isEqualTo(expectedRefCnt)
 
  94         it("should release memory for each message with invalid payload") {
 
  95             val (sut, sink) = vesHvWithStoringSink()
 
  96             val validMessage = vesWireFrameMessage(HVMEAS)
 
  97             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
 
  98             val expectedRefCnt = 0
 
 100             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
 
 102             assertThat(handledEvents).hasSize(1)
 
 104             assertThat(validMessage.refCnt())
 
 105                     .describedAs("handled message should be released")
 
 106                     .isEqualTo(expectedRefCnt)
 
 107             assertThat(msgWithInvalidPayload.refCnt())
 
 108                     .describedAs("message with invalid payload should be released")
 
 109                     .isEqualTo(expectedRefCnt)
 
 113         it("should release memory for each message with garbage frame") {
 
 114             val (sut, sink) = vesHvWithStoringSink()
 
 115             val validMessage = vesWireFrameMessage(HVMEAS)
 
 116             val msgWithGarbageFrame = garbageFrame()
 
 117             val expectedRefCnt = 0
 
 119             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
 
 121             assertThat(handledEvents).hasSize(1)
 
 123             assertThat(validMessage.refCnt())
 
 124                     .describedAs("handled message should be released")
 
 125                     .isEqualTo(expectedRefCnt)
 
 126             assertThat(msgWithGarbageFrame.refCnt())
 
 127                     .describedAs("message with garbage frame should be released")
 
 128                     .isEqualTo(expectedRefCnt)
 
 133     describe("message routing") {
 
 134         it("should direct message to a topic by means of routing configuration") {
 
 135             val (sut, sink) = vesHvWithStoringSink()
 
 137             val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 138             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 140             val msg = messages[0]
 
 141             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
 
 142             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
 
 145         it("should be able to direct 2 messages from different domains to one topic") {
 
 146             val (sut, sink) = vesHvWithStoringSink()
 
 148             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
 
 150             val messages = sut.handleConnection(sink,
 
 151                     vesWireFrameMessage(HVMEAS),
 
 152                     vesWireFrameMessage(HEARTBEAT),
 
 153                     vesWireFrameMessage(MEASUREMENT))
 
 155             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
 157             assertThat(messages[0].topic).describedAs("first message topic")
 
 158                     .isEqualTo(HVMEAS_TOPIC)
 
 160             assertThat(messages[1].topic).describedAs("second message topic")
 
 161                     .isEqualTo(HVMEAS_TOPIC)
 
 163             assertThat(messages[2].topic).describedAs("last message topic")
 
 164                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
 
 167         it("should drop message if route was not found") {
 
 168             val (sut, sink) = vesHvWithStoringSink()
 
 169             val messages = sut.handleConnection(sink,
 
 170                     vesWireFrameMessage(OTHER, "first"),
 
 171                     vesWireFrameMessage(HVMEAS, "second"),
 
 172                     vesWireFrameMessage(HEARTBEAT, "third"))
 
 174             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 176             val msg = messages[0]
 
 177             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
 
 178             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
 
 182     describe("configuration update") {
 
 184         val defaultTimeout = Duration.ofSeconds(10)
 
 186         given("successful configuration change") {
 
 188             lateinit var sut: Sut
 
 189             lateinit var sink: StoringSink
 
 192                 vesHvWithStoringSink().run {
 
 198             it("should update collector") {
 
 199                 val firstCollector = sut.collector
 
 201                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 202                 val collectorAfterUpdate = sut.collector
 
 204                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
 
 207             it("should start routing messages") {
 
 209                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
 
 211                 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 212                 assertThat(messages).isEmpty()
 
 214                 sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 216                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 217                 assertThat(messagesAfterUpdate).hasSize(1)
 
 218                 val message = messagesAfterUpdate[0]
 
 220                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
 
 221                         .isEqualTo(HVMEAS_TOPIC)
 
 222                 assertThat(message.partition).describedAs("routed message partition")
 
 226             it("should change domain routing") {
 
 228                 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 229                 assertThat(messages).hasSize(1)
 
 230                 val firstMessage = messages[0]
 
 232                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
 
 233                         .isEqualTo(HVMEAS_TOPIC)
 
 234                 assertThat(firstMessage.partition).describedAs("routed message partition")
 
 238                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 240                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 241                 assertThat(messagesAfterUpdate).hasSize(2)
 
 242                 val secondMessage = messagesAfterUpdate[1]
 
 244                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
 
 245                         .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
 
 246                 assertThat(secondMessage.partition).describedAs("routed message partition")
 
 250             it("should update routing for each client sending one message") {
 
 252                 val messagesAmount = 10
 
 253                 val messagesForEachTopic = 5
 
 255                 Flux.range(0, messagesAmount).doOnNext {
 
 256                     if (it == messagesForEachTopic) {
 
 257                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 260                     sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
 
 261                 }.then().block(defaultTimeout)
 
 264                 val messages = sink.sentMessages
 
 265                 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
 
 266                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
 
 268                 assertThat(messages.size).isEqualTo(messagesAmount)
 
 269                 assertThat(messagesForEachTopic)
 
 270                         .describedAs("amount of messages routed to each topic")
 
 271                         .isEqualTo(firstTopicMessagesCount)
 
 272                         .isEqualTo(secondTopicMessagesCount)
 
 275             it("should not update routing for client sending continuous stream of messages") {
 
 277                 val messageStreamSize = 10
 
 280                 val incomingMessages = Flux.range(0, messageStreamSize)
 
 283                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
 284                                 println("config changed")
 
 287                         .map { vesWireFrameMessage(HVMEAS) }
 
 290                 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
 
 292                 val messages = sink.sentMessages
 
 293                 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
 
 294                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
 
 296                 assertThat(messages.size).isEqualTo(messageStreamSize)
 
 297                 assertThat(firstTopicMessagesCount)
 
 298                         .describedAs("amount of messages routed to first topic")
 
 299                         .isEqualTo(messageStreamSize)
 
 301                 assertThat(secondTopicMessagesCount)
 
 302                         .describedAs("amount of messages routed to second topic")
 
 306             it("should mark the application healthy") {
 
 307                 assertThat(sut.healthStateProvider.currentHealth)
 
 308                         .describedAs("application health state")
 
 309                         .isEqualTo(HealthDescription.HEALTHY)
 
 313         given("failed configuration change") {
 
 314             val (sut, _) = vesHvWithStoringSink()
 
 315             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
 
 316             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 318             it("should mark the application unhealthy ") {
 
 319                 assertThat(sut.healthStateProvider.currentHealth)
 
 320                         .describedAs("application health state")
 
 321                         .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
 
 326     describe("request validation") {
 
 327         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
 
 328             val (sut, sink) = vesHvWithStoringSink()
 
 330             val handledMessages = sut.handleConnection(sink,
 
 331                     vesWireFrameMessage(HVMEAS, "first"),
 
 332                     vesMessageWithTooBigPayload(HVMEAS),
 
 333                     vesWireFrameMessage(HVMEAS))
 
 335             assertThat(handledMessages).hasSize(1)
 
 336             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
 
 342 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
 
 343     val sink = StoringSink()
 
 345     sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 346     return Pair(sut, sink)