2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.*
42 import reactor.core.publisher.Flux
43 import java.time.Duration
46 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49 object VesHvSpecification : Spek({
52 describe("VES High Volume Collector") {
53 it("should handle multiple HV RAN events") {
54 val (sut, sink) = vesHvWithStoringSink()
55 val messages = sut.handleConnection(sink,
56 vesWireFrameMessage(PERF3GPP),
57 vesWireFrameMessage(PERF3GPP)
61 .describedAs("should send all events")
65 it("should close sink when closing collector provider") {
66 val (sut, _) = vesHvWithStoringSink()
70 assertThat(sut.sinkProvider.closed).isTrue()
74 describe("Memory management") {
75 it("should release memory for each handled and dropped message") {
76 val (sut, sink) = vesHvWithStoringSink()
77 val validMessage = vesWireFrameMessage(PERF3GPP)
78 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
79 val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
80 val expectedRefCnt = 0
82 val handledEvents = sut.handleConnection(
83 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
85 assertThat(handledEvents).hasSize(1)
87 assertThat(validMessage.refCnt())
88 .describedAs("handled message should be released")
89 .isEqualTo(expectedRefCnt)
90 assertThat(msgWithInvalidFrame.refCnt())
91 .describedAs("message with invalid frame should be released")
92 .isEqualTo(expectedRefCnt)
93 assertThat(msgWithTooBigPayload.refCnt())
94 .describedAs("message with payload exceeding 1MiB should be released")
95 .isEqualTo(expectedRefCnt)
98 it("should release memory for each message with invalid payload") {
99 val (sut, sink) = vesHvWithStoringSink()
100 val validMessage = vesWireFrameMessage(PERF3GPP)
101 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
102 val expectedRefCnt = 0
104 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
106 assertThat(handledEvents).hasSize(1)
108 assertThat(validMessage.refCnt())
109 .describedAs("handled message should be released")
110 .isEqualTo(expectedRefCnt)
111 assertThat(msgWithInvalidPayload.refCnt())
112 .describedAs("message with invalid payload should be released")
113 .isEqualTo(expectedRefCnt)
117 it("should release memory for each message with garbage frame") {
118 val (sut, sink) = vesHvWithStoringSink()
119 val validMessage = vesWireFrameMessage(PERF3GPP)
120 val msgWithGarbageFrame = garbageFrame()
121 val expectedRefCnt = 0
123 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
125 assertThat(handledEvents).hasSize(1)
127 assertThat(validMessage.refCnt())
128 .describedAs("handled message should be released")
129 .isEqualTo(expectedRefCnt)
130 assertThat(msgWithGarbageFrame.refCnt())
131 .describedAs("message with garbage frame should be released")
132 .isEqualTo(expectedRefCnt)
137 describe("message routing") {
138 it("should direct message to a topic by means of routing configuration") {
139 val (sut, sink) = vesHvWithStoringSink()
141 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
142 assertThat(messages).describedAs("number of routed messages").hasSize(1)
144 val msg = messages[0]
145 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
146 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
149 it("should be able to direct 2 messages from different domains to one topic") {
150 val (sut, sink) = vesHvWithStoringSink()
152 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
154 val messages = sut.handleConnection(sink,
155 vesWireFrameMessage(PERF3GPP),
156 vesWireFrameMessage(HEARTBEAT),
157 vesWireFrameMessage(MEASUREMENT))
159 assertThat(messages).describedAs("number of routed messages").hasSize(3)
161 assertThat(messages[0].topic).describedAs("first message topic")
162 .isEqualTo(PERF3GPP_TOPIC)
164 assertThat(messages[1].topic).describedAs("second message topic")
165 .isEqualTo(PERF3GPP_TOPIC)
167 assertThat(messages[2].topic).describedAs("last message topic")
168 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
171 it("should drop message if route was not found") {
172 val (sut, sink) = vesHvWithStoringSink()
173 val messages = sut.handleConnection(sink,
174 vesWireFrameMessage(OTHER, "first"),
175 vesWireFrameMessage(PERF3GPP, "second"),
176 vesWireFrameMessage(HEARTBEAT, "third"))
178 assertThat(messages).describedAs("number of routed messages").hasSize(1)
180 val msg = messages[0]
181 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
182 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
186 describe("configuration update") {
188 val defaultTimeout = Duration.ofSeconds(10)
190 given("successful configuration change") {
192 lateinit var sut: Sut
193 lateinit var sink: StoringSink
196 vesHvWithStoringSink().run {
202 it("should update collector") {
203 val firstCollector = sut.collector
205 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
206 val collectorAfterUpdate = sut.collector
208 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
211 it("should start routing messages") {
213 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
215 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
216 assertThat(messages).isEmpty()
218 sut.configurationProvider.updateConfiguration(basicConfiguration)
220 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
221 assertThat(messagesAfterUpdate).hasSize(1)
222 val message = messagesAfterUpdate[0]
224 assertThat(message.topic).describedAs("routed message topic after configuration's change")
225 .isEqualTo(PERF3GPP_TOPIC)
226 assertThat(message.partition).describedAs("routed message partition")
230 it("should change domain routing") {
232 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
233 assertThat(messages).hasSize(1)
234 val firstMessage = messages[0]
236 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
237 .isEqualTo(PERF3GPP_TOPIC)
238 assertThat(firstMessage.partition).describedAs("routed message partition")
242 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
244 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
245 assertThat(messagesAfterUpdate).hasSize(2)
246 val secondMessage = messagesAfterUpdate[1]
248 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
249 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
250 assertThat(secondMessage.partition).describedAs("routed message partition")
254 it("should update routing for each client sending one message") {
256 val messagesAmount = 10
257 val messagesForEachTopic = 5
259 Flux.range(0, messagesAmount).doOnNext {
260 if (it == messagesForEachTopic) {
261 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
264 sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
265 }.then().block(defaultTimeout)
268 val messages = sink.sentMessages
269 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
270 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
272 assertThat(messages.size).isEqualTo(messagesAmount)
273 assertThat(messagesForEachTopic)
274 .describedAs("amount of messages routed to each topic")
275 .isEqualTo(firstTopicMessagesCount)
276 .isEqualTo(secondTopicMessagesCount)
279 it("should not update routing for client sending continuous stream of messages") {
281 val messageStreamSize = 10
284 val incomingMessages = Flux.range(0, messageStreamSize)
287 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
288 println("config changed")
291 .map { vesWireFrameMessage(PERF3GPP) }
294 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
296 val messages = sink.sentMessages
297 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
298 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
300 assertThat(messages.size).isEqualTo(messageStreamSize)
301 assertThat(firstTopicMessagesCount)
302 .describedAs("amount of messages routed to first topic")
303 .isEqualTo(messageStreamSize)
305 assertThat(secondTopicMessagesCount)
306 .describedAs("amount of messages routed to second topic")
310 it("should mark the application healthy") {
311 assertThat(sut.healthStateProvider.currentHealth)
312 .describedAs("application health state")
313 .isEqualTo(HealthDescription.HEALTHY)
317 given("failed configuration change") {
318 val (sut, _) = vesHvWithStoringSink()
319 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
320 sut.configurationProvider.updateConfiguration(basicConfiguration)
322 it("should mark the application unhealthy ") {
323 assertThat(sut.healthStateProvider.currentHealth)
324 .describedAs("application health state")
325 .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
330 describe("request validation") {
331 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
332 val (sut, sink) = vesHvWithStoringSink()
334 val handledMessages = sut.handleConnection(sink,
335 vesWireFrameMessage(PERF3GPP, "first"),
336 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
337 vesWireFrameMessage(PERF3GPP))
339 assertThat(handledMessages).hasSize(1)
340 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
346 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
347 val sink = StoringSink()
349 sut.configurationProvider.updateConfiguration(basicConfiguration)
350 return Pair(sut, sink)