2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
36 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
37 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
38 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
40 import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
41 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
42 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
44 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
45 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
46 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
47 import reactor.core.publisher.Flux
48 import java.time.Duration
51 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
54 object VesHvSpecification : Spek({
57 describe("VES High Volume Collector") {
58 it("should handle multiple HV RAN events") {
59 val (sut, sink) = vesHvWithStoringSink()
60 val messages = sut.handleConnection(sink,
61 vesWireFrameMessage(PERF3GPP),
62 vesWireFrameMessage(PERF3GPP)
66 .describedAs("should send all events")
70 it("should create sink lazily") {
71 val (sut, sink) = vesHvWithStoringSink()
73 // just connecting should not create sink
74 sut.handleConnection()
75 sut.close().unsafeRunSync()
78 assertThat(sink.closed).isFalse()
81 it("should close sink when closing collector provider") {
82 val (sut, sink) = vesHvWithStoringSink()
83 // given Sink initialized
84 // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
85 sut.handleConnection(vesWireFrameMessage(PERF3GPP))
88 sut.close().unsafeRunSync()
91 assertThat(sink.closed).isTrue()
95 describe("Memory management") {
96 it("should release memory for each handled and dropped message") {
97 val (sut, sink) = vesHvWithStoringSink()
98 val validMessage = vesWireFrameMessage(PERF3GPP)
99 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
100 val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
101 val expectedRefCnt = 0
103 val handledEvents = sut.handleConnection(
104 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
106 assertThat(handledEvents).hasSize(1)
108 assertThat(validMessage.refCnt())
109 .describedAs("handled message should be released")
110 .isEqualTo(expectedRefCnt)
111 assertThat(msgWithInvalidFrame.refCnt())
112 .describedAs("message with invalid frame should be released")
113 .isEqualTo(expectedRefCnt)
114 assertThat(msgWithTooBigPayload.refCnt())
115 .describedAs("message with payload exceeding 1MiB should be released")
116 .isEqualTo(expectedRefCnt)
119 it("should release memory for each message with invalid payload") {
120 val (sut, sink) = vesHvWithStoringSink()
121 val validMessage = vesWireFrameMessage(PERF3GPP)
122 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
123 val expectedRefCnt = 0
125 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
127 assertThat(handledEvents).hasSize(1)
129 assertThat(validMessage.refCnt())
130 .describedAs("handled message should be released")
131 .isEqualTo(expectedRefCnt)
132 assertThat(msgWithInvalidPayload.refCnt())
133 .describedAs("message with invalid payload should be released")
134 .isEqualTo(expectedRefCnt)
138 it("should release memory for each message with garbage frame") {
139 val (sut, sink) = vesHvWithStoringSink()
140 val validMessage = vesWireFrameMessage(PERF3GPP)
141 val msgWithGarbageFrame = garbageFrame()
142 val expectedRefCnt = 0
144 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
146 assertThat(handledEvents).hasSize(1)
148 assertThat(validMessage.refCnt())
149 .describedAs("handled message should be released")
150 .isEqualTo(expectedRefCnt)
151 assertThat(msgWithGarbageFrame.refCnt())
152 .describedAs("message with garbage frame should be released")
153 .isEqualTo(expectedRefCnt)
158 describe("message routing") {
159 it("should direct message to a topic by means of routing configuration") {
160 val (sut, sink) = vesHvWithStoringSink()
162 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
163 assertThat(messages).describedAs("number of routed messages").hasSize(1)
165 val msg = messages[0]
166 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
167 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
170 it("should be able to direct 2 messages from different domains to one topic") {
171 val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
173 val messages = sut.handleConnection(sink,
174 vesWireFrameMessage(PERF3GPP),
175 vesWireFrameMessage(HEARTBEAT),
176 vesWireFrameMessage(MEASUREMENT))
178 assertThat(messages).describedAs("number of routed messages").hasSize(3)
180 assertThat(messages[0].targetTopic).describedAs("first message topic")
181 .isEqualTo(PERF3GPP_TOPIC)
183 assertThat(messages[1].targetTopic).describedAs("second message topic")
184 .isEqualTo(PERF3GPP_TOPIC)
186 assertThat(messages[2].targetTopic).describedAs("last message topic")
187 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
190 it("should drop message if route was not found") {
191 val (sut, sink) = vesHvWithStoringSink()
192 val messages = sut.handleConnection(sink,
193 vesWireFrameMessage(OTHER, "first"),
194 vesWireFrameMessage(PERF3GPP, "second"),
195 vesWireFrameMessage(HEARTBEAT, "third"))
197 assertThat(messages).describedAs("number of routed messages").hasSize(1)
199 val msg = messages[0]
200 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
201 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
205 describe("request validation") {
206 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
207 val (sut, sink) = vesHvWithStoringSink()
209 val handledMessages = sut.handleConnection(sink,
210 vesWireFrameMessage(PERF3GPP, "first"),
211 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
212 vesWireFrameMessage(PERF3GPP))
214 assertThat(handledMessages).hasSize(1)
215 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
221 private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
222 val sink = StoringSink()
223 val sut = Sut(CollectorConfiguration(routing), sink)
224 return Pair(sut, sink)