2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
28 import reactor.core.publisher.Flux
29 import java.time.Duration
32 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35 object VesHvSpecification : Spek({
38 describe("VES High Volume Collector") {
39 it("should handle multiple HV RAN events") {
40 val sink = StoringSink()
42 sut.configurationProvider.updateConfiguration(basicConfiguration)
43 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
46 .describedAs("should send all events")
51 describe("Memory management") {
52 it("should release memory for each handled and dropped message") {
53 val sink = StoringSink()
55 sut.configurationProvider.updateConfiguration(basicConfiguration)
56 val validMessage = vesMessage(Domain.HVRANMEAS)
57 val msgWithInvalidDomain = vesMessage(Domain.OTHER)
58 val msgWithInvalidFrame = invalidWireFrame()
59 val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
60 val expectedRefCnt = 0
62 val handledEvents = sut.handleConnection(
63 sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
65 assertThat(handledEvents).hasSize(1)
67 assertThat(validMessage.refCnt())
68 .describedAs("handled message should be released")
69 .isEqualTo(expectedRefCnt)
70 assertThat(msgWithInvalidDomain.refCnt())
71 .describedAs("message with invalid domain should be released")
72 .isEqualTo(expectedRefCnt)
73 assertThat(msgWithInvalidFrame.refCnt())
74 .describedAs("message with invalid frame should be released")
75 .isEqualTo(expectedRefCnt)
76 assertThat(msgWithTooBigPayload.refCnt())
77 .describedAs("message with payload exceeding 1MiB should be released")
78 .isEqualTo(expectedRefCnt)
82 it("should release memory for each message with invalid payload") {
83 val sink = StoringSink()
85 sut.configurationProvider.updateConfiguration(basicConfiguration)
86 val validMessage = vesMessage(Domain.HVRANMEAS)
87 val msgWithInvalidPayload = invalidVesMessage()
88 val expectedRefCnt = 0
90 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
92 assertThat(handledEvents).hasSize(1)
94 assertThat(validMessage.refCnt())
95 .describedAs("handled message should be released")
96 .isEqualTo(expectedRefCnt)
97 assertThat(msgWithInvalidPayload.refCnt())
98 .describedAs("message with invalid payload should be released")
99 .isEqualTo(expectedRefCnt)
103 it("should release memory for each message with garbage frame") {
104 val sink = StoringSink()
106 sut.configurationProvider.updateConfiguration(basicConfiguration)
107 val validMessage = vesMessage(Domain.HVRANMEAS)
108 val msgWithGarbageFrame = garbageFrame()
109 val expectedRefCnt = 0
111 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
113 assertThat(handledEvents).hasSize(1)
115 assertThat(validMessage.refCnt())
116 .describedAs("handled message should be released")
117 .isEqualTo(expectedRefCnt)
118 assertThat(msgWithGarbageFrame.refCnt())
119 .describedAs("message with garbage frame should be released")
120 .isEqualTo(expectedRefCnt)
125 describe("message routing") {
126 it("should direct message to a topic by means of routing configuration") {
127 val sink = StoringSink()
129 sut.configurationProvider.updateConfiguration(basicConfiguration)
131 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
132 assertThat(messages).describedAs("number of routed messages").hasSize(1)
134 val msg = messages[0]
135 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
136 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
139 it("should be able to direct 2 messages from different domains to one topic") {
140 val sink = StoringSink()
143 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
145 val messages = sut.handleConnection(sink,
146 vesMessage(Domain.HVRANMEAS),
147 vesMessage(Domain.HEARTBEAT),
148 vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
150 assertThat(messages).describedAs("number of routed messages").hasSize(3)
152 assertThat(messages.get(0).topic).describedAs("first message topic")
153 .isEqualTo(HVRANMEAS_TOPIC)
155 assertThat(messages.get(1).topic).describedAs("second message topic")
156 .isEqualTo(HVRANMEAS_TOPIC)
158 assertThat(messages.get(2).topic).describedAs("last message topic")
159 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
162 it("should drop message if route was not found") {
163 val sink = StoringSink()
165 sut.configurationProvider.updateConfiguration(basicConfiguration)
166 val messages = sut.handleConnection(sink,
167 vesMessage(Domain.OTHER, "first"),
168 vesMessage(Domain.HVRANMEAS, "second"),
169 vesMessage(Domain.HEARTBEAT, "third"))
171 assertThat(messages).describedAs("number of routed messages").hasSize(1)
173 val msg = messages[0]
174 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
175 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
179 describe("configuration update") {
181 val defaultTimeout = Duration.ofSeconds(10)
183 it("should update collector on configuration change") {
184 val sink = StoringSink()
187 sut.configurationProvider.updateConfiguration(basicConfiguration)
188 val firstCollector = sut.collector
190 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
191 val collectorAfterUpdate = sut.collector
193 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
197 it("should start routing messages on configuration change") {
198 val sink = StoringSink()
201 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
203 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
204 assertThat(messages).isEmpty()
206 sut.configurationProvider.updateConfiguration(basicConfiguration)
208 val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
209 assertThat(messagesAfterUpdate).hasSize(1)
210 val message = messagesAfterUpdate[0]
212 assertThat(message.topic).describedAs("routed message topic after configuration's change")
213 .isEqualTo(HVRANMEAS_TOPIC)
214 assertThat(message.partition).describedAs("routed message partition")
218 it("should change domain routing on configuration change") {
219 val sink = StoringSink()
222 sut.configurationProvider.updateConfiguration(basicConfiguration)
224 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
225 assertThat(messages).hasSize(1)
226 val firstMessage = messages[0]
228 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
229 .isEqualTo(HVRANMEAS_TOPIC)
230 assertThat(firstMessage.partition).describedAs("routed message partition")
234 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
236 val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
237 assertThat(messagesAfterUpdate).hasSize(2)
238 val secondMessage = messagesAfterUpdate[1]
240 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
241 .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
242 assertThat(secondMessage.partition).describedAs("routed message partition")
246 it("should update routing for each client sending one message") {
247 val sink = StoringSink()
250 sut.configurationProvider.updateConfiguration(basicConfiguration)
252 val messagesAmount = 10
253 val messagesForEachTopic = 5
255 Flux.range(0, messagesAmount).doOnNext {
256 if (it == messagesForEachTopic) {
257 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
260 sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
261 }.then().block(defaultTimeout)
264 val messages = sink.sentMessages
265 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
266 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
268 assertThat(messages.size).isEqualTo(messagesAmount)
269 assertThat(messagesForEachTopic)
270 .describedAs("amount of messages routed to each topic")
271 .isEqualTo(firstTopicMessagesCount)
272 .isEqualTo(secondTopicMessagesCount)
276 it("should not update routing for client sending continuous stream of messages") {
277 val sink = StoringSink()
280 sut.configurationProvider.updateConfiguration(basicConfiguration)
282 val messageStreamSize = 10
285 val incomingMessages = Flux.range(0, messageStreamSize)
288 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
289 println("config changed")
292 .map { vesMessage(Domain.HVRANMEAS) }
295 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
297 val messages = sink.sentMessages
298 val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
299 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
301 assertThat(messages.size).isEqualTo(messageStreamSize)
302 assertThat(firstTopicMessagesCount)
303 .describedAs("amount of messages routed to first topic")
304 .isEqualTo(messageStreamSize)
306 assertThat(secondTopicMessagesCount)
307 .describedAs("amount of messages routed to second topic")
312 describe("request validation") {
313 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
314 val sink = StoringSink()
316 sut.configurationProvider.updateConfiguration(basicConfiguration)
318 val handledMessages = sut.handleConnection(sink,
319 vesMessage(Domain.HVRANMEAS, "first"),
320 vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
321 vesMessage(Domain.HVRANMEAS, "third"))
323 assertThat(handledMessages).hasSize(1)
324 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")