2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
28 import org.onap.dcae.collectors.veshv.config.api.model.Routing
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
35 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
36 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
37 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
39 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
40 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
42 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
46 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49 object VesHvSpecification : Spek({
52 describe("VES High Volume Collector") {
53 it("should handle multiple HV RAN events") {
54 val (sut, sink) = vesHvWithStoringSink()
55 val messages = sut.handleConnection(sink,
56 vesWireFrameMessage(PERF3GPP),
57 vesWireFrameMessage(PERF3GPP)
61 .describedAs("should send all events")
65 it("should create sink lazily") {
66 val (sut, sink) = vesHvWithStoringSink()
68 // just connecting should not create sink
69 sut.handleConnection()
73 assertThat(sink.closed).isFalse()
76 it("should close sink when closing collector provider") {
77 val (sut, sink) = vesHvWithStoringSink()
78 // given Sink initialized
79 // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
80 sut.handleConnection(vesWireFrameMessage(PERF3GPP))
86 assertThat(sink.closed).isTrue()
90 describe("Memory management") {
91 it("should release memory for each handled and dropped message") {
92 val (sut, sink) = vesHvWithStoringSink()
93 val validMessage = vesWireFrameMessage(PERF3GPP)
94 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
95 val msgWithTooBigPayload = messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
96 val expectedRefCnt = 0
98 val handledEvents = sut.handleConnection(
99 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
101 assertThat(handledEvents).hasSize(1)
103 assertThat(validMessage.refCnt())
104 .describedAs("handled message should be released")
105 .isEqualTo(expectedRefCnt)
106 assertThat(msgWithInvalidFrame.refCnt())
107 .describedAs("message with invalid frame should be released")
108 .isEqualTo(expectedRefCnt)
109 assertThat(msgWithTooBigPayload.refCnt())
110 .describedAs("message with payload exceeding 1MiB should be released")
111 .isEqualTo(expectedRefCnt)
114 it("should release memory for each message with invalid payload") {
115 val (sut, sink) = vesHvWithStoringSink()
116 val validMessage = vesWireFrameMessage(PERF3GPP)
117 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
118 val expectedRefCnt = 0
120 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
122 assertThat(handledEvents).hasSize(1)
124 assertThat(validMessage.refCnt())
125 .describedAs("handled message should be released")
126 .isEqualTo(expectedRefCnt)
127 assertThat(msgWithInvalidPayload.refCnt())
128 .describedAs("message with invalid payload should be released")
129 .isEqualTo(expectedRefCnt)
133 it("should release memory for each message with garbage frame") {
134 val (sut, sink) = vesHvWithStoringSink()
135 val validMessage = vesWireFrameMessage(PERF3GPP)
136 val msgWithGarbageFrame = garbageFrame()
137 val expectedRefCnt = 0
139 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
141 assertThat(handledEvents).hasSize(1)
143 assertThat(validMessage.refCnt())
144 .describedAs("handled message should be released")
145 .isEqualTo(expectedRefCnt)
146 assertThat(msgWithGarbageFrame.refCnt())
147 .describedAs("message with garbage frame should be released")
148 .isEqualTo(expectedRefCnt)
153 describe("message routing") {
154 it("should direct message to a topic by means of routing configuration") {
155 val (sut, sink) = vesHvWithStoringSink()
157 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
158 assertThat(messages).describedAs("number of routed messages").hasSize(1)
160 val msg = messages[0]
161 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
162 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
165 it("should be able to direct 2 messages from different domains to one topic") {
166 val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
168 val messages = sut.handleConnection(sink,
169 vesWireFrameMessage(PERF3GPP),
170 vesWireFrameMessage(HEARTBEAT),
171 vesWireFrameMessage(MEASUREMENT))
173 assertThat(messages).describedAs("number of routed messages").hasSize(3)
175 assertThat(messages[0].targetTopic).describedAs("first message topic")
176 .isEqualTo(PERF3GPP_TOPIC)
178 assertThat(messages[1].targetTopic).describedAs("second message topic")
179 .isEqualTo(PERF3GPP_TOPIC)
181 assertThat(messages[2].targetTopic).describedAs("last message topic")
182 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
185 it("should drop message if route was not found") {
186 val (sut, sink) = vesHvWithStoringSink()
187 val messages = sut.handleConnection(sink,
188 vesWireFrameMessage(OTHER, "first"),
189 vesWireFrameMessage(PERF3GPP, "second"),
190 vesWireFrameMessage(HEARTBEAT, "third"))
192 assertThat(messages).describedAs("number of routed messages").hasSize(1)
194 val msg = messages[0]
195 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
196 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
200 describe("request validation") {
201 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
202 val (sut, sink) = vesHvWithStoringSink()
204 val handledMessages = sut.handleConnection(sink,
205 vesWireFrameMessage(PERF3GPP, "first"),
206 messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
207 vesWireFrameMessage(PERF3GPP))
209 assertThat(handledMessages).hasSize(1)
210 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
216 private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
217 val sink = StoringSink()
218 val sut = Sut(CollectorConfiguration(routing), sink)
219 return Pair(sut, sink)