2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
28 import reactor.core.publisher.Flux
29 import java.time.Duration
32 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35 object VesHvSpecification : Spek({
38 describe("VES High Volume Collector") {
39 it("should handle multiple HV RAN events") {
40 val (sut, sink) = vesHvWithStoringSink()
41 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
44 .describedAs("should send all events")
48 it("should not handle messages received from client after end-of-transmission message") {
49 val (sut, sink) = vesHvWithStoringSink()
50 val validMessage = vesMessage(Domain.HVRANMEAS)
51 val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
52 val endOfTransmissionMessage = endOfTransmissionMessage()
54 val handledEvents = sut.handleConnection(sink,
56 endOfTransmissionMessage,
60 assertThat(handledEvents).hasSize(1)
61 assertThat(validMessage.refCnt())
62 .describedAs("first message should be released")
64 assertThat(endOfTransmissionMessage.refCnt())
65 .describedAs("end-of-transmission message should be released")
67 assertThat(anotherValidMessage.refCnt())
68 .describedAs("second (not handled) message should not be released")
73 describe("Memory management") {
74 it("should release memory for each handled and dropped message") {
75 val (sut, sink) = vesHvWithStoringSink()
76 val validMessage = vesMessage(Domain.HVRANMEAS)
77 val msgWithInvalidDomain = vesMessage(Domain.OTHER)
78 val msgWithInvalidFrame = invalidWireFrame()
79 val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
80 val expectedRefCnt = 0
82 val handledEvents = sut.handleConnection(
83 sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
85 assertThat(handledEvents).hasSize(1)
87 assertThat(validMessage.refCnt())
88 .describedAs("handled message should be released")
89 .isEqualTo(expectedRefCnt)
90 assertThat(msgWithInvalidDomain.refCnt())
91 .describedAs("message with invalid domain should be released")
92 .isEqualTo(expectedRefCnt)
93 assertThat(msgWithInvalidFrame.refCnt())
94 .describedAs("message with invalid frame should be released")
95 .isEqualTo(expectedRefCnt)
96 assertThat(msgWithTooBigPayload.refCnt())
97 .describedAs("message with payload exceeding 1MiB should be released")
98 .isEqualTo(expectedRefCnt)
101 it("should release memory for end-of-transmission message") {
102 val (sut, sink) = vesHvWithStoringSink()
103 val validMessage = vesMessage(Domain.HVRANMEAS)
104 val endOfTransmissionMessage = endOfTransmissionMessage()
105 val expectedRefCnt = 0
107 val handledEvents = sut.handleConnection(sink,
109 endOfTransmissionMessage
112 assertThat(handledEvents).hasSize(1)
113 assertThat(validMessage.refCnt())
114 .describedAs("handled message should be released")
115 .isEqualTo(expectedRefCnt)
116 assertThat(endOfTransmissionMessage.refCnt())
117 .describedAs("end-of-transmission message should be released")
118 .isEqualTo(expectedRefCnt)
121 it("should release memory for each message with invalid payload") {
122 val (sut, sink) = vesHvWithStoringSink()
123 val validMessage = vesMessage(Domain.HVRANMEAS)
124 val msgWithInvalidPayload = invalidVesMessage()
125 val expectedRefCnt = 0
127 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
129 assertThat(handledEvents).hasSize(1)
131 assertThat(validMessage.refCnt())
132 .describedAs("handled message should be released")
133 .isEqualTo(expectedRefCnt)
134 assertThat(msgWithInvalidPayload.refCnt())
135 .describedAs("message with invalid payload should be released")
136 .isEqualTo(expectedRefCnt)
140 it("should release memory for each message with garbage frame") {
141 val (sut, sink) = vesHvWithStoringSink()
142 val validMessage = vesMessage(Domain.HVRANMEAS)
143 val msgWithGarbageFrame = garbageFrame()
144 val expectedRefCnt = 0
146 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
148 assertThat(handledEvents).hasSize(1)
150 assertThat(validMessage.refCnt())
151 .describedAs("handled message should be released")
152 .isEqualTo(expectedRefCnt)
153 assertThat(msgWithGarbageFrame.refCnt())
154 .describedAs("message with garbage frame should be released")
155 .isEqualTo(expectedRefCnt)
160 describe("message routing") {
161 it("should direct message to a topic by means of routing configuration") {
162 val (sut, sink) = vesHvWithStoringSink()
164 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
165 assertThat(messages).describedAs("number of routed messages").hasSize(1)
167 val msg = messages[0]
168 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
169 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
172 it("should be able to direct 2 messages from different domains to one topic") {
173 val (sut, sink) = vesHvWithStoringSink()
175 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
177 val messages = sut.handleConnection(sink,
178 vesMessage(Domain.HVRANMEAS),
179 vesMessage(Domain.HEARTBEAT),
180 vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
182 assertThat(messages).describedAs("number of routed messages").hasSize(3)
184 assertThat(messages[0].topic).describedAs("first message topic")
185 .isEqualTo(HVRANMEAS_TOPIC)
187 assertThat(messages[1].topic).describedAs("second message topic")
188 .isEqualTo(HVRANMEAS_TOPIC)
190 assertThat(messages[2].topic).describedAs("last message topic")
191 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
194 it("should drop message if route was not found") {
195 val (sut, sink) = vesHvWithStoringSink()
196 val messages = sut.handleConnection(sink,
197 vesMessage(Domain.OTHER, "first"),
198 vesMessage(Domain.HVRANMEAS, "second"),
199 vesMessage(Domain.HEARTBEAT, "third"))
201 assertThat(messages).describedAs("number of routed messages").hasSize(1)
203 val msg = messages[0]
204 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
205 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
209 describe("configuration update") {
211 val defaultTimeout = Duration.ofSeconds(10)
213 it("should update collector on configuration change") {
214 val (sut, _) = vesHvWithStoringSink()
216 sut.configurationProvider.updateConfiguration(basicConfiguration)
217 val firstCollector = sut.collector
219 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
220 val collectorAfterUpdate = sut.collector
222 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
226 it("should start routing messages on configuration change") {
227 val (sut, sink) = vesHvWithStoringSink()
229 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
231 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
232 assertThat(messages).isEmpty()
234 sut.configurationProvider.updateConfiguration(basicConfiguration)
236 val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
237 assertThat(messagesAfterUpdate).hasSize(1)
238 val message = messagesAfterUpdate[0]
240 assertThat(message.topic).describedAs("routed message topic after configuration's change")
241 .isEqualTo(HVRANMEAS_TOPIC)
242 assertThat(message.partition).describedAs("routed message partition")
246 it("should change domain routing on configuration change") {
247 val (sut, sink) = vesHvWithStoringSink()
249 sut.configurationProvider.updateConfiguration(basicConfiguration)
251 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
252 assertThat(messages).hasSize(1)
253 val firstMessage = messages[0]
255 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
256 .isEqualTo(HVRANMEAS_TOPIC)
257 assertThat(firstMessage.partition).describedAs("routed message partition")
261 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
263 val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
264 assertThat(messagesAfterUpdate).hasSize(2)
265 val secondMessage = messagesAfterUpdate[1]
267 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
268 .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
269 assertThat(secondMessage.partition).describedAs("routed message partition")
273 it("should update routing for each client sending one message") {
274 val (sut, sink) = vesHvWithStoringSink()
276 sut.configurationProvider.updateConfiguration(basicConfiguration)
278 val messagesAmount = 10
279 val messagesForEachTopic = 5
281 Flux.range(0, messagesAmount).doOnNext {
282 if (it == messagesForEachTopic) {
283 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
286 sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
287 }.then().block(defaultTimeout)
290 val messages = sink.sentMessages
291 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
292 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
294 assertThat(messages.size).isEqualTo(messagesAmount)
295 assertThat(messagesForEachTopic)
296 .describedAs("amount of messages routed to each topic")
297 .isEqualTo(firstTopicMessagesCount)
298 .isEqualTo(secondTopicMessagesCount)
302 it("should not update routing for client sending continuous stream of messages") {
303 val (sut, sink) = vesHvWithStoringSink()
305 sut.configurationProvider.updateConfiguration(basicConfiguration)
307 val messageStreamSize = 10
310 val incomingMessages = Flux.range(0, messageStreamSize)
313 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
314 println("config changed")
317 .map { vesMessage(Domain.HVRANMEAS) }
320 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
322 val messages = sink.sentMessages
323 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
324 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
326 assertThat(messages.size).isEqualTo(messageStreamSize)
327 assertThat(firstTopicMessagesCount)
328 .describedAs("amount of messages routed to first topic")
329 .isEqualTo(messageStreamSize)
331 assertThat(secondTopicMessagesCount)
332 .describedAs("amount of messages routed to second topic")
337 describe("request validation") {
338 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
339 val (sut, sink) = vesHvWithStoringSink()
341 val handledMessages = sut.handleConnection(sink,
342 vesMessage(Domain.HVRANMEAS, "first"),
343 vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
344 vesMessage(Domain.HVRANMEAS, "third"))
346 assertThat(handledMessages).hasSize(1)
347 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
353 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
354 val sink = StoringSink()
356 sut.configurationProvider.updateConfiguration(basicConfiguration)
357 return Pair(sut, sink)