2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
43 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
44 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
47 import reactor.core.publisher.Flux
48 import java.time.Duration
51 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
54 object VesHvSpecification : Spek({
57 describe("VES High Volume Collector") {
58 it("should handle multiple HV RAN events") {
59 val (sut, sink) = vesHvWithStoringSink()
60 val messages = sut.handleConnection(sink,
61 vesWireFrameMessage(HVMEAS),
62 vesWireFrameMessage(HVMEAS)
66 .describedAs("should send all events")
70 it("should not handle messages received from client after end-of-transmission message") {
71 val (sut, sink) = vesHvWithStoringSink()
72 val validMessage = vesWireFrameMessage(HVMEAS)
73 val anotherValidMessage = vesWireFrameMessage(HVMEAS)
74 val endOfTransmissionMessage = endOfTransmissionWireMessage()
76 val handledEvents = sut.handleConnection(sink,
78 endOfTransmissionMessage,
82 assertThat(handledEvents).hasSize(1)
83 assertThat(validMessage.refCnt())
84 .describedAs("first message should be released")
86 assertThat(endOfTransmissionMessage.refCnt())
87 .describedAs("end-of-transmission message should be released")
89 assertThat(anotherValidMessage.refCnt())
90 .describedAs("second (not handled) message should not be released")
95 describe("Memory management") {
96 it("should release memory for each handled and dropped message") {
97 val (sut, sink) = vesHvWithStoringSink()
98 val validMessage = vesWireFrameMessage(HVMEAS)
99 val msgWithInvalidFrame = invalidWireFrame()
100 val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
101 val expectedRefCnt = 0
103 val handledEvents = sut.handleConnection(
104 sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
106 assertThat(handledEvents).hasSize(1)
108 assertThat(validMessage.refCnt())
109 .describedAs("handled message should be released")
110 .isEqualTo(expectedRefCnt)
111 assertThat(msgWithInvalidFrame.refCnt())
112 .describedAs("message with invalid frame should be released")
113 .isEqualTo(expectedRefCnt)
114 assertThat(msgWithTooBigPayload.refCnt())
115 .describedAs("message with payload exceeding 1MiB should be released")
116 .isEqualTo(expectedRefCnt)
119 it("should release memory for end-of-transmission message") {
120 val (sut, sink) = vesHvWithStoringSink()
121 val validMessage = vesWireFrameMessage(HVMEAS)
122 val endOfTransmissionMessage = endOfTransmissionWireMessage()
123 val expectedRefCnt = 0
125 val handledEvents = sut.handleConnection(sink,
127 endOfTransmissionMessage
130 assertThat(handledEvents).hasSize(1)
131 assertThat(validMessage.refCnt())
132 .describedAs("handled message should be released")
133 .isEqualTo(expectedRefCnt)
134 assertThat(endOfTransmissionMessage.refCnt())
135 .describedAs("end-of-transmission message should be released")
136 .isEqualTo(expectedRefCnt)
139 it("should release memory for each message with invalid payload") {
140 val (sut, sink) = vesHvWithStoringSink()
141 val validMessage = vesWireFrameMessage(HVMEAS)
142 val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
143 val expectedRefCnt = 0
145 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
147 assertThat(handledEvents).hasSize(1)
149 assertThat(validMessage.refCnt())
150 .describedAs("handled message should be released")
151 .isEqualTo(expectedRefCnt)
152 assertThat(msgWithInvalidPayload.refCnt())
153 .describedAs("message with invalid payload should be released")
154 .isEqualTo(expectedRefCnt)
158 it("should release memory for each message with garbage frame") {
159 val (sut, sink) = vesHvWithStoringSink()
160 val validMessage = vesWireFrameMessage(HVMEAS)
161 val msgWithGarbageFrame = garbageFrame()
162 val expectedRefCnt = 0
164 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
166 assertThat(handledEvents).hasSize(1)
168 assertThat(validMessage.refCnt())
169 .describedAs("handled message should be released")
170 .isEqualTo(expectedRefCnt)
171 assertThat(msgWithGarbageFrame.refCnt())
172 .describedAs("message with garbage frame should be released")
173 .isEqualTo(expectedRefCnt)
178 describe("message routing") {
179 it("should direct message to a topic by means of routing configuration") {
180 val (sut, sink) = vesHvWithStoringSink()
182 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
183 assertThat(messages).describedAs("number of routed messages").hasSize(1)
185 val msg = messages[0]
186 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
187 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
190 it("should be able to direct 2 messages from different domains to one topic") {
191 val (sut, sink) = vesHvWithStoringSink()
193 sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
195 val messages = sut.handleConnection(sink,
196 vesWireFrameMessage(HVMEAS),
197 vesWireFrameMessage(HEARTBEAT),
198 vesWireFrameMessage(MEASUREMENT))
200 assertThat(messages).describedAs("number of routed messages").hasSize(3)
202 assertThat(messages[0].topic).describedAs("first message topic")
203 .isEqualTo(HVMEAS_TOPIC)
205 assertThat(messages[1].topic).describedAs("second message topic")
206 .isEqualTo(HVMEAS_TOPIC)
208 assertThat(messages[2].topic).describedAs("last message topic")
209 .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
212 it("should drop message if route was not found") {
213 val (sut, sink) = vesHvWithStoringSink()
214 val messages = sut.handleConnection(sink,
215 vesWireFrameMessage(OTHER, "first"),
216 vesWireFrameMessage(HVMEAS, "second"),
217 vesWireFrameMessage(HEARTBEAT, "third"))
219 assertThat(messages).describedAs("number of routed messages").hasSize(1)
221 val msg = messages[0]
222 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
223 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
227 describe("configuration update") {
229 val defaultTimeout = Duration.ofSeconds(10)
231 given("successful configuration change") {
233 lateinit var sut: Sut
234 lateinit var sink: StoringSink
237 vesHvWithStoringSink().run {
243 it("should update collector") {
244 val firstCollector = sut.collector
246 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
247 val collectorAfterUpdate = sut.collector
249 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
252 it("should start routing messages") {
254 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
256 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
257 assertThat(messages).isEmpty()
259 sut.configurationProvider.updateConfiguration(basicConfiguration)
261 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
262 assertThat(messagesAfterUpdate).hasSize(1)
263 val message = messagesAfterUpdate[0]
265 assertThat(message.topic).describedAs("routed message topic after configuration's change")
266 .isEqualTo(HVMEAS_TOPIC)
267 assertThat(message.partition).describedAs("routed message partition")
271 it("should change domain routing") {
273 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
274 assertThat(messages).hasSize(1)
275 val firstMessage = messages[0]
277 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
278 .isEqualTo(HVMEAS_TOPIC)
279 assertThat(firstMessage.partition).describedAs("routed message partition")
283 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
285 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
286 assertThat(messagesAfterUpdate).hasSize(2)
287 val secondMessage = messagesAfterUpdate[1]
289 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
290 .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
291 assertThat(secondMessage.partition).describedAs("routed message partition")
295 it("should update routing for each client sending one message") {
297 val messagesAmount = 10
298 val messagesForEachTopic = 5
300 Flux.range(0, messagesAmount).doOnNext {
301 if (it == messagesForEachTopic) {
302 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
305 sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
306 }.then().block(defaultTimeout)
309 val messages = sink.sentMessages
310 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
311 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
313 assertThat(messages.size).isEqualTo(messagesAmount)
314 assertThat(messagesForEachTopic)
315 .describedAs("amount of messages routed to each topic")
316 .isEqualTo(firstTopicMessagesCount)
317 .isEqualTo(secondTopicMessagesCount)
320 it("should not update routing for client sending continuous stream of messages") {
322 val messageStreamSize = 10
325 val incomingMessages = Flux.range(0, messageStreamSize)
328 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
329 println("config changed")
332 .map { vesWireFrameMessage(HVMEAS) }
335 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
337 val messages = sink.sentMessages
338 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
339 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
341 assertThat(messages.size).isEqualTo(messageStreamSize)
342 assertThat(firstTopicMessagesCount)
343 .describedAs("amount of messages routed to first topic")
344 .isEqualTo(messageStreamSize)
346 assertThat(secondTopicMessagesCount)
347 .describedAs("amount of messages routed to second topic")
351 it("should mark the application healthy") {
352 assertThat(sut.healthStateProvider.currentHealth)
353 .describedAs("application health state")
354 .isEqualTo(HealthDescription.HEALTHY)
358 given("failed configuration change") {
359 val (sut, _) = vesHvWithStoringSink()
360 sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
361 sut.configurationProvider.updateConfiguration(basicConfiguration)
363 it("should mark the application unhealthy ") {
364 assertThat(sut.healthStateProvider.currentHealth)
365 .describedAs("application health state")
366 .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
371 describe("request validation") {
372 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
373 val (sut, sink) = vesHvWithStoringSink()
375 val handledMessages = sut.handleConnection(sink,
376 vesWireFrameMessage(HVMEAS, "first"),
377 vesMessageWithTooBigPayload(HVMEAS),
378 vesWireFrameMessage(HVMEAS))
380 assertThat(handledMessages).hasSize(1)
381 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
387 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
388 val sink = StoringSink()
390 sut.configurationProvider.updateConfiguration(basicConfiguration)
391 return Pair(sut, sink)