2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018-2019 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
33 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
37 import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
45 import reactor.core.publisher.Flux
46 import java.time.Duration
49 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52 object VesHvSpecification : Spek({
55 describe("VES High Volume Collector") {
56 it("should handle multiple HV RAN events") {
57 val (sut, sink) = vesHvWithStoringSink()
58 val messages = sut.handleConnection(sink,
59 vesWireFrameMessage(PERF3GPP),
60 vesWireFrameMessage(PERF3GPP)
64 .describedAs("should send all events")
68 it("should create sink lazily") {
69 val (sut, sink) = vesHvWithStoringSink()
71 // just connecting should not create sink
72 sut.handleConnection()
73 sut.close().unsafeRunSync()
76 assertThat(sink.closed).isFalse()
79 it("should close sink when closing collector provider") {
80 val (sut, sink) = vesHvWithStoringSink()
81 // given Sink initialized
82 // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
83 sut.handleConnection(vesWireFrameMessage(PERF3GPP))
86 sut.close().unsafeRunSync()
89 assertThat(sink.closed).isTrue()
93 describe("Memory management") {
94 it("should release memory for each handled and dropped message") {
95 val (sut, sink) = vesHvWithStoringSink()
96 val validMessage = vesWireFrameMessage(PERF3GPP)
97 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
98 val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
99 val expectedRefCnt = 0
101 val handledEvents = sut.handleConnection(
102 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
104 assertThat(handledEvents).hasSize(1)
106 assertThat(validMessage.refCnt())
107 .describedAs("handled message should be released")
108 .isEqualTo(expectedRefCnt)
109 assertThat(msgWithInvalidFrame.refCnt())
110 .describedAs("message with invalid frame should be released")
111 .isEqualTo(expectedRefCnt)
112 assertThat(msgWithTooBigPayload.refCnt())
113 .describedAs("message with payload exceeding 1MiB should be released")
114 .isEqualTo(expectedRefCnt)
117 it("should release memory for each message with invalid payload") {
118 val (sut, sink) = vesHvWithStoringSink()
119 val validMessage = vesWireFrameMessage(PERF3GPP)
120 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
121 val expectedRefCnt = 0
123 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
125 assertThat(handledEvents).hasSize(1)
127 assertThat(validMessage.refCnt())
128 .describedAs("handled message should be released")
129 .isEqualTo(expectedRefCnt)
130 assertThat(msgWithInvalidPayload.refCnt())
131 .describedAs("message with invalid payload should be released")
132 .isEqualTo(expectedRefCnt)
136 it("should release memory for each message with garbage frame") {
137 val (sut, sink) = vesHvWithStoringSink()
138 val validMessage = vesWireFrameMessage(PERF3GPP)
139 val msgWithGarbageFrame = garbageFrame()
140 val expectedRefCnt = 0
142 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
144 assertThat(handledEvents).hasSize(1)
146 assertThat(validMessage.refCnt())
147 .describedAs("handled message should be released")
148 .isEqualTo(expectedRefCnt)
149 assertThat(msgWithGarbageFrame.refCnt())
150 .describedAs("message with garbage frame should be released")
151 .isEqualTo(expectedRefCnt)
156 describe("message routing") {
157 it("should direct message to a topic by means of routing configuration") {
158 val (sut, sink) = vesHvWithStoringSink()
160 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
161 assertThat(messages).describedAs("number of routed messages").hasSize(1)
163 val msg = messages[0]
164 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
165 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
168 it("should be able to direct 2 messages from different domains to one topic") {
169 val (sut, sink) = vesHvWithStoringSink()
171 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
173 val messages = sut.handleConnection(sink,
174 vesWireFrameMessage(PERF3GPP),
175 vesWireFrameMessage(HEARTBEAT),
176 vesWireFrameMessage(MEASUREMENT))
178 assertThat(messages).describedAs("number of routed messages").hasSize(3)
180 assertThat(messages[0].targetTopic).describedAs("first message topic")
181 .isEqualTo(PERF3GPP_TOPIC)
183 assertThat(messages[1].targetTopic).describedAs("second message topic")
184 .isEqualTo(PERF3GPP_TOPIC)
186 assertThat(messages[2].targetTopic).describedAs("last message topic")
187 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
190 it("should drop message if route was not found") {
191 val (sut, sink) = vesHvWithStoringSink()
192 val messages = sut.handleConnection(sink,
193 vesWireFrameMessage(OTHER, "first"),
194 vesWireFrameMessage(PERF3GPP, "second"),
195 vesWireFrameMessage(HEARTBEAT, "third"))
197 assertThat(messages).describedAs("number of routed messages").hasSize(1)
199 val msg = messages[0]
200 assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
201 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
205 describe("configuration update") {
207 val defaultTimeout = Duration.ofSeconds(10)
209 given("successful configuration change") {
211 lateinit var sut: Sut
212 lateinit var sink: StoringSink
215 vesHvWithStoringSink().run {
221 it("should update collector") {
222 val firstCollector = sut.collector
224 sut.configurationProvider.updateConfiguration(alternativeRouting)
225 val collectorAfterUpdate = sut.collector
227 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
230 it("should start routing messages") {
232 sut.configurationProvider.updateConfiguration(emptyRouting)
234 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
235 assertThat(messages).isEmpty()
237 sut.configurationProvider.updateConfiguration(basicRouting)
239 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
240 assertThat(messagesAfterUpdate).hasSize(1)
241 val message = messagesAfterUpdate[0]
243 assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
244 .isEqualTo(PERF3GPP_TOPIC)
245 assertThat(message.partition).describedAs("routed message partition")
249 it("should change domain routing") {
251 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
252 assertThat(messages).hasSize(1)
253 val firstMessage = messages[0]
255 assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
256 .isEqualTo(PERF3GPP_TOPIC)
257 assertThat(firstMessage.partition).describedAs("routed message partition")
261 sut.configurationProvider.updateConfiguration(alternativeRouting)
263 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
264 assertThat(messagesAfterUpdate).hasSize(2)
265 val secondMessage = messagesAfterUpdate[1]
267 assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
268 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
269 assertThat(secondMessage.partition).describedAs("routed message partition")
273 it("should update routing for each client sending one message") {
275 val messagesAmount = 10
276 val messagesForEachTopic = 5
278 Flux.range(0, messagesAmount).doOnNext {
279 if (it == messagesForEachTopic) {
280 sut.configurationProvider.updateConfiguration(alternativeRouting)
283 sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
284 }.then().block(defaultTimeout)
287 val messages = sink.sentMessages
288 val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
289 val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
291 assertThat(messages.size).isEqualTo(messagesAmount)
292 assertThat(messagesForEachTopic)
293 .describedAs("amount of messages routed to each topic")
294 .isEqualTo(firstTopicMessagesCount)
295 .isEqualTo(secondTopicMessagesCount)
298 it("should not update routing for client sending continuous stream of messages") {
300 val messageStreamSize = 10
303 val incomingMessages = Flux.range(0, messageStreamSize)
306 sut.configurationProvider.updateConfiguration(alternativeRouting)
307 println("config changed")
310 .map { vesWireFrameMessage(PERF3GPP) }
313 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
315 val messages = sink.sentMessages
316 val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
317 val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
319 assertThat(messages.size).isEqualTo(messageStreamSize)
320 assertThat(firstTopicMessagesCount)
321 .describedAs("amount of messages routed to first topic")
322 .isEqualTo(messageStreamSize)
324 assertThat(secondTopicMessagesCount)
325 .describedAs("amount of messages routed to second topic")
329 it("should mark the application healthy") {
330 assertThat(sut.healthStateProvider.currentHealth)
331 .describedAs("application health state")
332 .isEqualTo(HealthDescription.HEALTHY)
336 given("failed configuration change") {
337 val (sut, _) = vesHvWithStoringSink()
338 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
339 sut.configurationProvider.updateConfiguration(basicRouting)
341 it("should mark the application unhealthy ") {
342 assertThat(sut.healthStateProvider.currentHealth)
343 .describedAs("application health state")
344 .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
349 describe("request validation") {
350 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
351 val (sut, sink) = vesHvWithStoringSink()
353 val handledMessages = sut.handleConnection(sink,
354 vesWireFrameMessage(PERF3GPP, "first"),
355 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
356 vesWireFrameMessage(PERF3GPP))
358 assertThat(handledMessages).hasSize(1)
359 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
365 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
366 val sink = StoringSink()
368 sut.configurationProvider.updateConfiguration(basicRouting)
369 return Pair(sut, sink)