2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
28 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
29 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
30 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
31 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
32 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
33 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
34 import reactor.core.publisher.Flux
35 import java.time.Duration
38 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41 object VesHvSpecification : Spek({
44 describe("VES High Volume Collector") {
45 it("should handle multiple HV RAN events") {
46 val (sut, sink) = vesHvWithStoringSink()
47 val messages = sut.handleConnection(sink,
48 vesWireFrameMessage(Domain.HVRANMEAS),
49 vesWireFrameMessage(Domain.HVRANMEAS)
53 .describedAs("should send all events")
57 it("should not handle messages received from client after end-of-transmission message") {
58 val (sut, sink) = vesHvWithStoringSink()
59 val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
60 val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
61 val endOfTransmissionMessage = endOfTransmissionWireMessage()
63 val handledEvents = sut.handleConnection(sink,
65 endOfTransmissionMessage,
69 assertThat(handledEvents).hasSize(1)
70 assertThat(validMessage.refCnt())
71 .describedAs("first message should be released")
73 assertThat(endOfTransmissionMessage.refCnt())
74 .describedAs("end-of-transmission message should be released")
76 assertThat(anotherValidMessage.refCnt())
77 .describedAs("second (not handled) message should not be released")
82 describe("Memory management") {
83 it("should release memory for each handled and dropped message") {
84 val (sut, sink) = vesHvWithStoringSink()
85 val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
86 val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
87 val msgWithInvalidFrame = invalidWireFrame()
88 val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
89 val expectedRefCnt = 0
91 val handledEvents = sut.handleConnection(
92 sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
94 assertThat(handledEvents).hasSize(1)
96 assertThat(validMessage.refCnt())
97 .describedAs("handled message should be released")
98 .isEqualTo(expectedRefCnt)
99 assertThat(msgWithInvalidDomain.refCnt())
100 .describedAs("message with invalid domain should be released")
101 .isEqualTo(expectedRefCnt)
102 assertThat(msgWithInvalidFrame.refCnt())
103 .describedAs("message with invalid frame should be released")
104 .isEqualTo(expectedRefCnt)
105 assertThat(msgWithTooBigPayload.refCnt())
106 .describedAs("message with payload exceeding 1MiB should be released")
107 .isEqualTo(expectedRefCnt)
110 it("should release memory for end-of-transmission message") {
111 val (sut, sink) = vesHvWithStoringSink()
112 val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
113 val endOfTransmissionMessage = endOfTransmissionWireMessage()
114 val expectedRefCnt = 0
116 val handledEvents = sut.handleConnection(sink,
118 endOfTransmissionMessage
121 assertThat(handledEvents).hasSize(1)
122 assertThat(validMessage.refCnt())
123 .describedAs("handled message should be released")
124 .isEqualTo(expectedRefCnt)
125 assertThat(endOfTransmissionMessage.refCnt())
126 .describedAs("end-of-transmission message should be released")
127 .isEqualTo(expectedRefCnt)
130 it("should release memory for each message with invalid payload") {
131 val (sut, sink) = vesHvWithStoringSink()
132 val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
133 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
134 val expectedRefCnt = 0
136 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
138 assertThat(handledEvents).hasSize(1)
140 assertThat(validMessage.refCnt())
141 .describedAs("handled message should be released")
142 .isEqualTo(expectedRefCnt)
143 assertThat(msgWithInvalidPayload.refCnt())
144 .describedAs("message with invalid payload should be released")
145 .isEqualTo(expectedRefCnt)
149 it("should release memory for each message with garbage frame") {
150 val (sut, sink) = vesHvWithStoringSink()
151 val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
152 val msgWithGarbageFrame = garbageFrame()
153 val expectedRefCnt = 0
155 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
157 assertThat(handledEvents).hasSize(1)
159 assertThat(validMessage.refCnt())
160 .describedAs("handled message should be released")
161 .isEqualTo(expectedRefCnt)
162 assertThat(msgWithGarbageFrame.refCnt())
163 .describedAs("message with garbage frame should be released")
164 .isEqualTo(expectedRefCnt)
169 describe("message routing") {
170 it("should direct message to a topic by means of routing configuration") {
171 val (sut, sink) = vesHvWithStoringSink()
173 val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
174 assertThat(messages).describedAs("number of routed messages").hasSize(1)
176 val msg = messages[0]
177 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
178 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
181 it("should be able to direct 2 messages from different domains to one topic") {
182 val (sut, sink) = vesHvWithStoringSink()
184 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
186 val messages = sut.handleConnection(sink,
187 vesWireFrameMessage(Domain.HVRANMEAS),
188 vesWireFrameMessage(Domain.HEARTBEAT),
189 vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
191 assertThat(messages).describedAs("number of routed messages").hasSize(3)
193 assertThat(messages[0].topic).describedAs("first message topic")
194 .isEqualTo(HVRANMEAS_TOPIC)
196 assertThat(messages[1].topic).describedAs("second message topic")
197 .isEqualTo(HVRANMEAS_TOPIC)
199 assertThat(messages[2].topic).describedAs("last message topic")
200 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
203 it("should drop message if route was not found") {
204 val (sut, sink) = vesHvWithStoringSink()
205 val messages = sut.handleConnection(sink,
206 vesWireFrameMessage(Domain.OTHER, "first"),
207 vesWireFrameMessage(Domain.HVRANMEAS, "second"),
208 vesWireFrameMessage(Domain.HEARTBEAT, "third"))
210 assertThat(messages).describedAs("number of routed messages").hasSize(1)
212 val msg = messages[0]
213 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
214 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
218 describe("configuration update") {
220 val defaultTimeout = Duration.ofSeconds(10)
222 it("should update collector on configuration change") {
223 val (sut, _) = vesHvWithStoringSink()
225 sut.configurationProvider.updateConfiguration(basicConfiguration)
226 val firstCollector = sut.collector
228 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
229 val collectorAfterUpdate = sut.collector
231 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
235 it("should start routing messages on configuration change") {
236 val (sut, sink) = vesHvWithStoringSink()
238 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
240 val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
241 assertThat(messages).isEmpty()
243 sut.configurationProvider.updateConfiguration(basicConfiguration)
245 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
246 assertThat(messagesAfterUpdate).hasSize(1)
247 val message = messagesAfterUpdate[0]
249 assertThat(message.topic).describedAs("routed message topic after configuration's change")
250 .isEqualTo(HVRANMEAS_TOPIC)
251 assertThat(message.partition).describedAs("routed message partition")
255 it("should change domain routing on configuration change") {
256 val (sut, sink) = vesHvWithStoringSink()
258 sut.configurationProvider.updateConfiguration(basicConfiguration)
260 val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
261 assertThat(messages).hasSize(1)
262 val firstMessage = messages[0]
264 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
265 .isEqualTo(HVRANMEAS_TOPIC)
266 assertThat(firstMessage.partition).describedAs("routed message partition")
270 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
272 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
273 assertThat(messagesAfterUpdate).hasSize(2)
274 val secondMessage = messagesAfterUpdate[1]
276 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
277 .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
278 assertThat(secondMessage.partition).describedAs("routed message partition")
282 it("should update routing for each client sending one message") {
283 val (sut, sink) = vesHvWithStoringSink()
285 sut.configurationProvider.updateConfiguration(basicConfiguration)
287 val messagesAmount = 10
288 val messagesForEachTopic = 5
290 Flux.range(0, messagesAmount).doOnNext {
291 if (it == messagesForEachTopic) {
292 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
295 sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
296 }.then().block(defaultTimeout)
299 val messages = sink.sentMessages
300 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
301 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
303 assertThat(messages.size).isEqualTo(messagesAmount)
304 assertThat(messagesForEachTopic)
305 .describedAs("amount of messages routed to each topic")
306 .isEqualTo(firstTopicMessagesCount)
307 .isEqualTo(secondTopicMessagesCount)
311 it("should not update routing for client sending continuous stream of messages") {
312 val (sut, sink) = vesHvWithStoringSink()
314 sut.configurationProvider.updateConfiguration(basicConfiguration)
316 val messageStreamSize = 10
319 val incomingMessages = Flux.range(0, messageStreamSize)
322 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
323 println("config changed")
326 .map { vesWireFrameMessage(Domain.HVRANMEAS) }
329 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
331 val messages = sink.sentMessages
332 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
333 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
335 assertThat(messages.size).isEqualTo(messageStreamSize)
336 assertThat(firstTopicMessagesCount)
337 .describedAs("amount of messages routed to first topic")
338 .isEqualTo(messageStreamSize)
340 assertThat(secondTopicMessagesCount)
341 .describedAs("amount of messages routed to second topic")
346 describe("request validation") {
347 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
348 val (sut, sink) = vesHvWithStoringSink()
350 val handledMessages = sut.handleConnection(sink,
351 vesWireFrameMessage(Domain.HVRANMEAS, "first"),
352 vesMessageWithTooBigPayload(Domain.HVRANMEAS),
353 vesWireFrameMessage(Domain.HVRANMEAS))
355 assertThat(handledMessages).hasSize(1)
356 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
362 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
363 val sink = StoringSink()
365 sut.configurationProvider.updateConfiguration(basicConfiguration)
366 return Pair(sut, sink)