2  * ============LICENSE_START=======================================================
 
   3  * dcaegen2-collectors-veshv
 
   4  * ================================================================================
 
   5  * Copyright (C) 2018 NOKIA
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.collectors.veshv.tests.component
 
  22 import org.assertj.core.api.Assertions.assertThat
 
  23 import org.jetbrains.spek.api.Spek
 
  24 import org.jetbrains.spek.api.dsl.describe
 
  25 import org.jetbrains.spek.api.dsl.it
 
  26 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 
  27 import org.onap.dcae.collectors.veshv.model.Routing
 
  28 import org.onap.dcae.collectors.veshv.model.routing
 
  29 import org.onap.dcae.collectors.veshv.tests.fakes.*
 
  30 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 
  33  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
 
  36 object VesHvSpecification : Spek({
 
  39     describe("VES High Volume Collector") {
 
  40         it("should handle multiple HV RAN events") {
 
  41             val sink = StoringSink()
 
  43             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
  44             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
 
  47                     .describedAs("should send all events")
 
  52     describe("Memory management") {
 
  54         it("should release memory for each handled and dropped message") {
 
  55             val sink = StoringSink()
 
  57             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
  58             val validMessage = vesMessage(Domain.HVRANMEAS)
 
  59             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
 
  60             val msgWithInvalidFrame = invalidWireFrame()
 
  61             val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
 
  62             val expectedRefCnt = 0
 
  64             val handledEvents = sut.handleConnection(
 
  65                     sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
 
  67             assertThat(handledEvents).hasSize(1)
 
  69             assertThat(validMessage.refCnt())
 
  70                     .describedAs("handled message should be released")
 
  71                     .isEqualTo(expectedRefCnt)
 
  72             assertThat(msgWithInvalidDomain.refCnt())
 
  73                     .describedAs("message with invalid domain should be released")
 
  74                     .isEqualTo(expectedRefCnt)
 
  75             assertThat(msgWithInvalidFrame.refCnt())
 
  76                     .describedAs("message with invalid frame should be released")
 
  77                     .isEqualTo(expectedRefCnt)
 
  78             assertThat(msgWithTooBigPayload.refCnt())
 
  79                     .describedAs("message with payload exceeding 1MiB should be released")
 
  80                     .isEqualTo(expectedRefCnt)
 
  84         it("should release memory for each message with invalid payload") {
 
  85             val sink = StoringSink()
 
  87             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
  88             val validMessage = vesMessage(Domain.HVRANMEAS)
 
  89             val msgWithInvalidPayload = invalidVesMessage()
 
  90             val expectedRefCnt = 0
 
  92             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
 
  94             assertThat(handledEvents).hasSize(1)
 
  96             assertThat(validMessage.refCnt())
 
  97                     .describedAs("handled message should be released")
 
  98                     .isEqualTo(expectedRefCnt)
 
  99             assertThat(msgWithInvalidPayload.refCnt())
 
 100                     .describedAs("message with invalid payload should be released")
 
 101                     .isEqualTo(expectedRefCnt)
 
 105         it("should release memory for each message with garbage frame") {
 
 106             val sink = StoringSink()
 
 108             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 109             val validMessage = vesMessage(Domain.HVRANMEAS)
 
 110             val msgWithGarbageFrame = garbageFrame()
 
 111             val expectedRefCnt = 0
 
 113             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
 
 115             assertThat(handledEvents).hasSize(1)
 
 117             assertThat(validMessage.refCnt())
 
 118                     .describedAs("handled message should be released")
 
 119                     .isEqualTo(expectedRefCnt)
 
 120             assertThat(msgWithGarbageFrame.refCnt())
 
 121                     .describedAs("message with garbage frame should be released")
 
 122                     .isEqualTo(expectedRefCnt)
 
 127     describe("message routing") {
 
 128         it("should direct message to a topic by means of routing configuration") {
 
 129             val sink = StoringSink()
 
 131             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 133             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
 
 134             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 136             val msg = messages[0]
 
 137             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
 
 138             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
 
 141         it("should be able to direct 2 messages from different domains to one topic") {
 
 142             val sink = StoringSink()
 
 145             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
 
 147             val messages = sut.handleConnection(sink,
 
 148                     vesMessage(Domain.HVRANMEAS),
 
 149                     vesMessage(Domain.HEARTBEAT),
 
 150                     vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
 
 152             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
 154             assertThat(messages.get(0).topic).describedAs("first message topic")
 
 155                     .isEqualTo(HVRANMEAS_TOPIC)
 
 157             assertThat(messages.get(1).topic).describedAs("second message topic")
 
 158                     .isEqualTo(HVRANMEAS_TOPIC)
 
 160             assertThat(messages.get(2).topic).describedAs("last message topic")
 
 161                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
 
 164         it("should drop message if route was not found") {
 
 165             val sink = StoringSink()
 
 167             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 168             val messages = sut.handleConnection(sink,
 
 169                     vesMessage(Domain.OTHER, "first"),
 
 170                     vesMessage(Domain.HVRANMEAS, "second"),
 
 171                     vesMessage(Domain.HEARTBEAT, "third"))
 
 173             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
 175             val msg = messages[0]
 
 176             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
 
 177             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
 
 181     describe("request validation") {
 
 182         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
 
 183             val sink = StoringSink()
 
 185             sut.configurationProvider.updateConfiguration(basicConfiguration)
 
 187             val handledMessages = sut.handleConnection(sink,
 
 188                     vesMessage(Domain.HVRANMEAS, "first"),
 
 189                     vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
 
 190                     vesMessage(Domain.HVRANMEAS, "third"))
 
 192             assertThat(handledMessages).hasSize(1)
 
 193             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")