2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.*
42 import reactor.core.publisher.Flux
43 import java.time.Duration
46 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49 object VesHvSpecification : Spek({
52 describe("VES High Volume Collector") {
53 it("should handle multiple HV RAN events") {
54 val (sut, sink) = vesHvWithStoringSink()
55 val messages = sut.handleConnection(sink,
56 vesWireFrameMessage(PERF3GPP),
57 vesWireFrameMessage(PERF3GPP)
61 .describedAs("should send all events")
66 describe("Memory management") {
67 it("should release memory for each handled and dropped message") {
68 val (sut, sink) = vesHvWithStoringSink()
69 val validMessage = vesWireFrameMessage(PERF3GPP)
70 val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
71 val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
72 val expectedRefCnt = 0
74 val handledEvents = sut.handleConnection(
75 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
77 assertThat(handledEvents).hasSize(1)
79 assertThat(validMessage.refCnt())
80 .describedAs("handled message should be released")
81 .isEqualTo(expectedRefCnt)
82 assertThat(msgWithInvalidFrame.refCnt())
83 .describedAs("message with invalid frame should be released")
84 .isEqualTo(expectedRefCnt)
85 assertThat(msgWithTooBigPayload.refCnt())
86 .describedAs("message with payload exceeding 1MiB should be released")
87 .isEqualTo(expectedRefCnt)
90 it("should release memory for each message with invalid payload") {
91 val (sut, sink) = vesHvWithStoringSink()
92 val validMessage = vesWireFrameMessage(PERF3GPP)
93 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
94 val expectedRefCnt = 0
96 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
98 assertThat(handledEvents).hasSize(1)
100 assertThat(validMessage.refCnt())
101 .describedAs("handled message should be released")
102 .isEqualTo(expectedRefCnt)
103 assertThat(msgWithInvalidPayload.refCnt())
104 .describedAs("message with invalid payload should be released")
105 .isEqualTo(expectedRefCnt)
109 it("should release memory for each message with garbage frame") {
110 val (sut, sink) = vesHvWithStoringSink()
111 val validMessage = vesWireFrameMessage(PERF3GPP)
112 val msgWithGarbageFrame = garbageFrame()
113 val expectedRefCnt = 0
115 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
117 assertThat(handledEvents).hasSize(1)
119 assertThat(validMessage.refCnt())
120 .describedAs("handled message should be released")
121 .isEqualTo(expectedRefCnt)
122 assertThat(msgWithGarbageFrame.refCnt())
123 .describedAs("message with garbage frame should be released")
124 .isEqualTo(expectedRefCnt)
129 describe("message routing") {
130 it("should direct message to a topic by means of routing configuration") {
131 val (sut, sink) = vesHvWithStoringSink()
133 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
134 assertThat(messages).describedAs("number of routed messages").hasSize(1)
136 val msg = messages[0]
137 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
138 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
141 it("should be able to direct 2 messages from different domains to one topic") {
142 val (sut, sink) = vesHvWithStoringSink()
144 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
146 val messages = sut.handleConnection(sink,
147 vesWireFrameMessage(PERF3GPP),
148 vesWireFrameMessage(HEARTBEAT),
149 vesWireFrameMessage(MEASUREMENT))
151 assertThat(messages).describedAs("number of routed messages").hasSize(3)
153 assertThat(messages[0].topic).describedAs("first message topic")
154 .isEqualTo(PERF3GPP_TOPIC)
156 assertThat(messages[1].topic).describedAs("second message topic")
157 .isEqualTo(PERF3GPP_TOPIC)
159 assertThat(messages[2].topic).describedAs("last message topic")
160 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
163 it("should drop message if route was not found") {
164 val (sut, sink) = vesHvWithStoringSink()
165 val messages = sut.handleConnection(sink,
166 vesWireFrameMessage(OTHER, "first"),
167 vesWireFrameMessage(PERF3GPP, "second"),
168 vesWireFrameMessage(HEARTBEAT, "third"))
170 assertThat(messages).describedAs("number of routed messages").hasSize(1)
172 val msg = messages[0]
173 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
174 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
178 describe("configuration update") {
180 val defaultTimeout = Duration.ofSeconds(10)
182 given("successful configuration change") {
184 lateinit var sut: Sut
185 lateinit var sink: StoringSink
188 vesHvWithStoringSink().run {
194 it("should update collector") {
195 val firstCollector = sut.collector
197 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
198 val collectorAfterUpdate = sut.collector
200 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
203 it("should start routing messages") {
205 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
207 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
208 assertThat(messages).isEmpty()
210 sut.configurationProvider.updateConfiguration(basicConfiguration)
212 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
213 assertThat(messagesAfterUpdate).hasSize(1)
214 val message = messagesAfterUpdate[0]
216 assertThat(message.topic).describedAs("routed message topic after configuration's change")
217 .isEqualTo(PERF3GPP_TOPIC)
218 assertThat(message.partition).describedAs("routed message partition")
222 it("should change domain routing") {
224 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
225 assertThat(messages).hasSize(1)
226 val firstMessage = messages[0]
228 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
229 .isEqualTo(PERF3GPP_TOPIC)
230 assertThat(firstMessage.partition).describedAs("routed message partition")
234 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
236 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
237 assertThat(messagesAfterUpdate).hasSize(2)
238 val secondMessage = messagesAfterUpdate[1]
240 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
241 .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
242 assertThat(secondMessage.partition).describedAs("routed message partition")
246 it("should update routing for each client sending one message") {
248 val messagesAmount = 10
249 val messagesForEachTopic = 5
251 Flux.range(0, messagesAmount).doOnNext {
252 if (it == messagesForEachTopic) {
253 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
256 sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
257 }.then().block(defaultTimeout)
260 val messages = sink.sentMessages
261 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
262 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
264 assertThat(messages.size).isEqualTo(messagesAmount)
265 assertThat(messagesForEachTopic)
266 .describedAs("amount of messages routed to each topic")
267 .isEqualTo(firstTopicMessagesCount)
268 .isEqualTo(secondTopicMessagesCount)
271 it("should not update routing for client sending continuous stream of messages") {
273 val messageStreamSize = 10
276 val incomingMessages = Flux.range(0, messageStreamSize)
279 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
280 println("config changed")
283 .map { vesWireFrameMessage(PERF3GPP) }
286 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
288 val messages = sink.sentMessages
289 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
290 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
292 assertThat(messages.size).isEqualTo(messageStreamSize)
293 assertThat(firstTopicMessagesCount)
294 .describedAs("amount of messages routed to first topic")
295 .isEqualTo(messageStreamSize)
297 assertThat(secondTopicMessagesCount)
298 .describedAs("amount of messages routed to second topic")
302 it("should mark the application healthy") {
303 assertThat(sut.healthStateProvider.currentHealth)
304 .describedAs("application health state")
305 .isEqualTo(HealthDescription.HEALTHY)
309 given("failed configuration change") {
310 val (sut, _) = vesHvWithStoringSink()
311 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
312 sut.configurationProvider.updateConfiguration(basicConfiguration)
314 it("should mark the application unhealthy ") {
315 assertThat(sut.healthStateProvider.currentHealth)
316 .describedAs("application health state")
317 .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
322 describe("request validation") {
323 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
324 val (sut, sink) = vesHvWithStoringSink()
326 val handledMessages = sut.handleConnection(sink,
327 vesWireFrameMessage(PERF3GPP, "first"),
328 messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
329 vesWireFrameMessage(PERF3GPP))
331 assertThat(handledMessages).hasSize(1)
332 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
338 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
339 val sink = StoringSink()
341 sut.configurationProvider.updateConfiguration(basicConfiguration)
342 return Pair(sut, sink)