2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
46 import reactor.core.publisher.Flux
47 import java.time.Duration
50 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53 object VesHvSpecification : Spek({
56 describe("VES High Volume Collector") {
57 it("should handle multiple HV RAN events") {
58 val (sut, sink) = vesHvWithStoringSink()
59 val messages = sut.handleConnection(sink,
60 vesWireFrameMessage(PERF3GPP),
61 vesWireFrameMessage(PERF3GPP)
65 .describedAs("should send all events")
70 describe("Memory management") {
71 it("should release memory for each handled and dropped message") {
72 val (sut, sink) = vesHvWithStoringSink()
73 val validMessage = vesWireFrameMessage(PERF3GPP)
74 val msgWithInvalidFrame = invalidWireFrame()
75 val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
76 val expectedRefCnt = 0
78 val handledEvents = sut.handleConnection(
79 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
81 assertThat(handledEvents).hasSize(1)
83 assertThat(validMessage.refCnt())
84 .describedAs("handled message should be released")
85 .isEqualTo(expectedRefCnt)
86 assertThat(msgWithInvalidFrame.refCnt())
87 .describedAs("message with invalid frame should be released")
88 .isEqualTo(expectedRefCnt)
89 assertThat(msgWithTooBigPayload.refCnt())
90 .describedAs("message with payload exceeding 1MiB should be released")
91 .isEqualTo(expectedRefCnt)
94 it("should release memory for each message with invalid payload") {
95 val (sut, sink) = vesHvWithStoringSink()
96 val validMessage = vesWireFrameMessage(PERF3GPP)
97 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
98 val expectedRefCnt = 0
100 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
102 assertThat(handledEvents).hasSize(1)
104 assertThat(validMessage.refCnt())
105 .describedAs("handled message should be released")
106 .isEqualTo(expectedRefCnt)
107 assertThat(msgWithInvalidPayload.refCnt())
108 .describedAs("message with invalid payload should be released")
109 .isEqualTo(expectedRefCnt)
113 it("should release memory for each message with garbage frame") {
114 val (sut, sink) = vesHvWithStoringSink()
115 val validMessage = vesWireFrameMessage(PERF3GPP)
116 val msgWithGarbageFrame = garbageFrame()
117 val expectedRefCnt = 0
119 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
121 assertThat(handledEvents).hasSize(1)
123 assertThat(validMessage.refCnt())
124 .describedAs("handled message should be released")
125 .isEqualTo(expectedRefCnt)
126 assertThat(msgWithGarbageFrame.refCnt())
127 .describedAs("message with garbage frame should be released")
128 .isEqualTo(expectedRefCnt)
133 describe("message routing") {
134 it("should direct message to a topic by means of routing configuration") {
135 val (sut, sink) = vesHvWithStoringSink()
137 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
138 assertThat(messages).describedAs("number of routed messages").hasSize(1)
140 val msg = messages[0]
141 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
142 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
145 it("should be able to direct 2 messages from different domains to one topic") {
146 val (sut, sink) = vesHvWithStoringSink()
148 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
150 val messages = sut.handleConnection(sink,
151 vesWireFrameMessage(PERF3GPP),
152 vesWireFrameMessage(HEARTBEAT),
153 vesWireFrameMessage(MEASUREMENT))
155 assertThat(messages).describedAs("number of routed messages").hasSize(3)
157 assertThat(messages[0].topic).describedAs("first message topic")
158 .isEqualTo(PERF3GPP_TOPIC)
160 assertThat(messages[1].topic).describedAs("second message topic")
161 .isEqualTo(PERF3GPP_TOPIC)
163 assertThat(messages[2].topic).describedAs("last message topic")
164 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
167 it("should drop message if route was not found") {
168 val (sut, sink) = vesHvWithStoringSink()
169 val messages = sut.handleConnection(sink,
170 vesWireFrameMessage(OTHER, "first"),
171 vesWireFrameMessage(PERF3GPP, "second"),
172 vesWireFrameMessage(HEARTBEAT, "third"))
174 assertThat(messages).describedAs("number of routed messages").hasSize(1)
176 val msg = messages[0]
177 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
178 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
182 describe("configuration update") {
184 val defaultTimeout = Duration.ofSeconds(10)
186 given("successful configuration change") {
188 lateinit var sut: Sut
189 lateinit var sink: StoringSink
192 vesHvWithStoringSink().run {
198 it("should update collector") {
199 val firstCollector = sut.collector
201 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
202 val collectorAfterUpdate = sut.collector
204 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
207 it("should start routing messages") {
209 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
211 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
212 assertThat(messages).isEmpty()
214 sut.configurationProvider.updateConfiguration(basicConfiguration)
216 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
217 assertThat(messagesAfterUpdate).hasSize(1)
218 val message = messagesAfterUpdate[0]
220 assertThat(message.topic).describedAs("routed message topic after configuration's change")
221 .isEqualTo(PERF3GPP_TOPIC)
222 assertThat(message.partition).describedAs("routed message partition")
226 it("should change domain routing") {
228 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
229 assertThat(messages).hasSize(1)
230 val firstMessage = messages[0]
232 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
233 .isEqualTo(PERF3GPP_TOPIC)
234 assertThat(firstMessage.partition).describedAs("routed message partition")
238 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
240 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
241 assertThat(messagesAfterUpdate).hasSize(2)
242 val secondMessage = messagesAfterUpdate[1]
244 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
245 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
246 assertThat(secondMessage.partition).describedAs("routed message partition")
250 it("should update routing for each client sending one message") {
252 val messagesAmount = 10
253 val messagesForEachTopic = 5
255 Flux.range(0, messagesAmount).doOnNext {
256 if (it == messagesForEachTopic) {
257 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
260 sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
261 }.then().block(defaultTimeout)
264 val messages = sink.sentMessages
265 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
266 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
268 assertThat(messages.size).isEqualTo(messagesAmount)
269 assertThat(messagesForEachTopic)
270 .describedAs("amount of messages routed to each topic")
271 .isEqualTo(firstTopicMessagesCount)
272 .isEqualTo(secondTopicMessagesCount)
275 it("should not update routing for client sending continuous stream of messages") {
277 val messageStreamSize = 10
280 val incomingMessages = Flux.range(0, messageStreamSize)
283 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
284 println("config changed")
287 .map { vesWireFrameMessage(PERF3GPP) }
290 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
292 val messages = sink.sentMessages
293 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
294 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
296 assertThat(messages.size).isEqualTo(messageStreamSize)
297 assertThat(firstTopicMessagesCount)
298 .describedAs("amount of messages routed to first topic")
299 .isEqualTo(messageStreamSize)
301 assertThat(secondTopicMessagesCount)
302 .describedAs("amount of messages routed to second topic")
306 it("should mark the application healthy") {
307 assertThat(sut.healthStateProvider.currentHealth)
308 .describedAs("application health state")
309 .isEqualTo(HealthDescription.HEALTHY)
313 given("failed configuration change") {
314 val (sut, _) = vesHvWithStoringSink()
315 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
316 sut.configurationProvider.updateConfiguration(basicConfiguration)
318 it("should mark the application unhealthy ") {
319 assertThat(sut.healthStateProvider.currentHealth)
320 .describedAs("application health state")
321 .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
326 describe("request validation") {
327 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
328 val (sut, sink) = vesHvWithStoringSink()
330 val handledMessages = sut.handleConnection(sink,
331 vesWireFrameMessage(PERF3GPP, "first"),
332 vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
333 vesWireFrameMessage(PERF3GPP))
335 assertThat(handledMessages).hasSize(1)
336 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
342 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
343 val sink = StoringSink()
345 sut.configurationProvider.updateConfiguration(basicConfiguration)
346 return Pair(sut, sink)