2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
37 import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
45 import reactor.core.publisher.Flux
46 import java.time.Duration
49 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52 object VesHvSpecification : Spek({
55 describe("VES High Volume Collector") {
56 it("should handle multiple HV RAN events") {
57 val (sut, sink) = vesHvWithStoringSink()
58 val messages = sut.handleConnection(sink,
59 vesWireFrameMessage(PERF3GPP),
60 vesWireFrameMessage(PERF3GPP)
64 .describedAs("should send all events")
68 it("should close sink when closing collector provider") {
69 val (sut, _) = vesHvWithStoringSink()
73 assertThat(sut.sinkProvider.closed).isTrue()
77 describe("Memory management") {
78 it("should release memory for each handled and dropped message") {
79 val (sut, sink) = vesHvWithStoringSink()
80 val validMessage = vesWireFrameMessage(PERF3GPP)
81 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
82 val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
83 val expectedRefCnt = 0
85 val handledEvents = sut.handleConnection(
86 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
88 assertThat(handledEvents).hasSize(1)
90 assertThat(validMessage.refCnt())
91 .describedAs("handled message should be released")
92 .isEqualTo(expectedRefCnt)
93 assertThat(msgWithInvalidFrame.refCnt())
94 .describedAs("message with invalid frame should be released")
95 .isEqualTo(expectedRefCnt)
96 assertThat(msgWithTooBigPayload.refCnt())
97 .describedAs("message with payload exceeding 1MiB should be released")
98 .isEqualTo(expectedRefCnt)
101 it("should release memory for each message with invalid payload") {
102 val (sut, sink) = vesHvWithStoringSink()
103 val validMessage = vesWireFrameMessage(PERF3GPP)
104 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
105 val expectedRefCnt = 0
107 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
109 assertThat(handledEvents).hasSize(1)
111 assertThat(validMessage.refCnt())
112 .describedAs("handled message should be released")
113 .isEqualTo(expectedRefCnt)
114 assertThat(msgWithInvalidPayload.refCnt())
115 .describedAs("message with invalid payload should be released")
116 .isEqualTo(expectedRefCnt)
120 it("should release memory for each message with garbage frame") {
121 val (sut, sink) = vesHvWithStoringSink()
122 val validMessage = vesWireFrameMessage(PERF3GPP)
123 val msgWithGarbageFrame = garbageFrame()
124 val expectedRefCnt = 0
126 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
128 assertThat(handledEvents).hasSize(1)
130 assertThat(validMessage.refCnt())
131 .describedAs("handled message should be released")
132 .isEqualTo(expectedRefCnt)
133 assertThat(msgWithGarbageFrame.refCnt())
134 .describedAs("message with garbage frame should be released")
135 .isEqualTo(expectedRefCnt)
140 describe("message routing") {
141 it("should direct message to a topic by means of routing configuration") {
142 val (sut, sink) = vesHvWithStoringSink()
144 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
145 assertThat(messages).describedAs("number of routed messages").hasSize(1)
147 val msg = messages[0]
148 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
149 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
152 it("should be able to direct 2 messages from different domains to one topic") {
153 val (sut, sink) = vesHvWithStoringSink()
155 sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
157 val messages = sut.handleConnection(sink,
158 vesWireFrameMessage(PERF3GPP),
159 vesWireFrameMessage(HEARTBEAT),
160 vesWireFrameMessage(MEASUREMENT))
162 assertThat(messages).describedAs("number of routed messages").hasSize(3)
164 assertThat(messages[0].topic).describedAs("first message topic")
165 .isEqualTo(PERF3GPP_TOPIC)
167 assertThat(messages[1].topic).describedAs("second message topic")
168 .isEqualTo(PERF3GPP_TOPIC)
170 assertThat(messages[2].topic).describedAs("last message topic")
171 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
174 it("should drop message if route was not found") {
175 val (sut, sink) = vesHvWithStoringSink()
176 val messages = sut.handleConnection(sink,
177 vesWireFrameMessage(OTHER, "first"),
178 vesWireFrameMessage(PERF3GPP, "second"),
179 vesWireFrameMessage(HEARTBEAT, "third"))
181 assertThat(messages).describedAs("number of routed messages").hasSize(1)
183 val msg = messages[0]
184 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
185 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
189 describe("configuration update") {
191 val defaultTimeout = Duration.ofSeconds(10)
193 given("successful configuration change") {
195 lateinit var sut: Sut
196 lateinit var sink: StoringSink
199 vesHvWithStoringSink().run {
205 it("should update collector") {
206 val firstCollector = sut.collector
208 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
209 val collectorAfterUpdate = sut.collector
211 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
214 it("should start routing messages") {
216 sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
218 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
219 assertThat(messages).isEmpty()
221 sut.configurationProvider.updateConfiguration(configWithBasicRouting)
223 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
224 assertThat(messagesAfterUpdate).hasSize(1)
225 val message = messagesAfterUpdate[0]
227 assertThat(message.topic).describedAs("routed message topic after configuration's change")
228 .isEqualTo(PERF3GPP_TOPIC)
229 assertThat(message.partition).describedAs("routed message partition")
233 it("should change domain routing") {
235 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
236 assertThat(messages).hasSize(1)
237 val firstMessage = messages[0]
239 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
240 .isEqualTo(PERF3GPP_TOPIC)
241 assertThat(firstMessage.partition).describedAs("routed message partition")
245 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
247 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
248 assertThat(messagesAfterUpdate).hasSize(2)
249 val secondMessage = messagesAfterUpdate[1]
251 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
252 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
253 assertThat(secondMessage.partition).describedAs("routed message partition")
257 it("should update routing for each client sending one message") {
259 val messagesAmount = 10
260 val messagesForEachTopic = 5
262 Flux.range(0, messagesAmount).doOnNext {
263 if (it == messagesForEachTopic) {
264 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
267 sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
268 }.then().block(defaultTimeout)
271 val messages = sink.sentMessages
272 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
273 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
275 assertThat(messages.size).isEqualTo(messagesAmount)
276 assertThat(messagesForEachTopic)
277 .describedAs("amount of messages routed to each topic")
278 .isEqualTo(firstTopicMessagesCount)
279 .isEqualTo(secondTopicMessagesCount)
282 it("should not update routing for client sending continuous stream of messages") {
284 val messageStreamSize = 10
287 val incomingMessages = Flux.range(0, messageStreamSize)
290 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
291 println("config changed")
294 .map { vesWireFrameMessage(PERF3GPP) }
297 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
299 val messages = sink.sentMessages
300 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
301 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
303 assertThat(messages.size).isEqualTo(messageStreamSize)
304 assertThat(firstTopicMessagesCount)
305 .describedAs("amount of messages routed to first topic")
306 .isEqualTo(messageStreamSize)
308 assertThat(secondTopicMessagesCount)
309 .describedAs("amount of messages routed to second topic")
313 it("should mark the application healthy") {
314 assertThat(sut.healthStateProvider.currentHealth)
315 .describedAs("application health state")
316 .isEqualTo(HealthDescription.HEALTHY)
320 given("failed configuration change") {
321 val (sut, _) = vesHvWithStoringSink()
322 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
323 sut.configurationProvider.updateConfiguration(configWithBasicRouting)
325 it("should mark the application unhealthy ") {
326 assertThat(sut.healthStateProvider.currentHealth)
327 .describedAs("application health state")
328 .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
333 describe("request validation") {
334 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
335 val (sut, sink) = vesHvWithStoringSink()
337 val handledMessages = sut.handleConnection(sink,
338 vesWireFrameMessage(PERF3GPP, "first"),
339 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
340 vesWireFrameMessage(PERF3GPP))
342 assertThat(handledMessages).hasSize(1)
343 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
349 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
350 val sink = StoringSink()
352 sut.configurationProvider.updateConfiguration(configWithBasicRouting)
353 return Pair(sut, sink)