2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
27 import org.onap.dcae.collectors.veshv.model.Routing
28 import org.onap.dcae.collectors.veshv.model.routing
29 import org.onap.dcae.collectors.veshv.tests.fakes.*
30 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
33 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
36 object VesHvSpecification : Spek({
39 describe("VES High Volume Collector") {
40 it("should handle multiple HV RAN events") {
41 val sink = StoringSink()
43 sut.configurationProvider.updateConfiguration(basicConfiguration)
44 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
47 .describedAs("should send all events")
52 describe("Memory management") {
54 it("should release memory for each handled and dropped message") {
55 val sink = StoringSink()
57 sut.configurationProvider.updateConfiguration(basicConfiguration)
58 val validMessage = vesMessage(Domain.HVRANMEAS)
59 val msgWithInvalidDomain = vesMessage(Domain.OTHER)
60 val msgWithInvalidFrame = invalidWireFrame()
61 val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
62 val expectedRefCnt = 0
64 val handledEvents = sut.handleConnection(
65 sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
67 assertThat(handledEvents).hasSize(1)
69 assertThat(validMessage.refCnt())
70 .describedAs("handled message should be released")
71 .isEqualTo(expectedRefCnt)
72 assertThat(msgWithInvalidDomain.refCnt())
73 .describedAs("message with invalid domain should be released")
74 .isEqualTo(expectedRefCnt)
75 assertThat(msgWithInvalidFrame.refCnt())
76 .describedAs("message with invalid frame should be released")
77 .isEqualTo(expectedRefCnt)
78 assertThat(msgWithTooBigPayload.refCnt())
79 .describedAs("message with payload exceeding 1MiB should be released")
80 .isEqualTo(expectedRefCnt)
84 it("should release memory for each message with invalid payload") {
85 val sink = StoringSink()
87 sut.configurationProvider.updateConfiguration(basicConfiguration)
88 val validMessage = vesMessage(Domain.HVRANMEAS)
89 val msgWithInvalidPayload = invalidVesMessage()
90 val expectedRefCnt = 0
92 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
94 assertThat(handledEvents).hasSize(1)
96 assertThat(validMessage.refCnt())
97 .describedAs("handled message should be released")
98 .isEqualTo(expectedRefCnt)
99 assertThat(msgWithInvalidPayload.refCnt())
100 .describedAs("message with invalid payload should be released")
101 .isEqualTo(expectedRefCnt)
105 it("should release memory for each message with garbage frame") {
106 val sink = StoringSink()
108 sut.configurationProvider.updateConfiguration(basicConfiguration)
109 val validMessage = vesMessage(Domain.HVRANMEAS)
110 val msgWithGarbageFrame = garbageFrame()
111 val expectedRefCnt = 0
113 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
115 assertThat(handledEvents).hasSize(1)
117 assertThat(validMessage.refCnt())
118 .describedAs("handled message should be released")
119 .isEqualTo(expectedRefCnt)
120 assertThat(msgWithGarbageFrame.refCnt())
121 .describedAs("message with garbage frame should be released")
122 .isEqualTo(expectedRefCnt)
127 describe("message routing") {
128 it("should direct message to a topic by means of routing configuration") {
129 val sink = StoringSink()
131 sut.configurationProvider.updateConfiguration(basicConfiguration)
133 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
134 assertThat(messages).describedAs("number of routed messages").hasSize(1)
136 val msg = messages[0]
137 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
138 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
141 it("should be able to direct 2 messages from different domains to one topic") {
142 val sink = StoringSink()
145 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
147 val messages = sut.handleConnection(sink,
148 vesMessage(Domain.HVRANMEAS),
149 vesMessage(Domain.HEARTBEAT),
150 vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
152 assertThat(messages).describedAs("number of routed messages").hasSize(3)
154 assertThat(messages.get(0).topic).describedAs("first message topic")
155 .isEqualTo(HVRANMEAS_TOPIC)
157 assertThat(messages.get(1).topic).describedAs("second message topic")
158 .isEqualTo(HVRANMEAS_TOPIC)
160 assertThat(messages.get(2).topic).describedAs("last message topic")
161 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
164 it("should drop message if route was not found") {
165 val sink = StoringSink()
167 sut.configurationProvider.updateConfiguration(basicConfiguration)
168 val messages = sut.handleConnection(sink,
169 vesMessage(Domain.OTHER, "first"),
170 vesMessage(Domain.HVRANMEAS, "second"),
171 vesMessage(Domain.HEARTBEAT, "third"))
173 assertThat(messages).describedAs("number of routed messages").hasSize(1)
175 val msg = messages[0]
176 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
177 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
181 describe("request validation") {
182 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
183 val sink = StoringSink()
185 sut.configurationProvider.updateConfiguration(basicConfiguration)
187 val handledMessages = sut.handleConnection(sink,
188 vesMessage(Domain.HVRANMEAS, "first"),
189 vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
190 vesMessage(Domain.HVRANMEAS, "third"))
192 assertThat(handledMessages).hasSize(1)
193 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")