2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.tests.component
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
27 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
28 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
29 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
32 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35 object VesHvSpecification : Spek({
38 describe("VES High Volume Collector") {
39 it("should handle multiple HV RAN events") {
40 val sink = StoringSink()
42 sut.configurationProvider.updateConfiguration(basicConfiguration)
43 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
46 .describedAs("should send all events")
51 describe("Memory management") {
53 it("should release memory for each handled and dropped message") {
54 val sink = StoringSink()
56 sut.configurationProvider.updateConfiguration(basicConfiguration)
57 val validMessage = vesMessage(Domain.HVRANMEAS)
58 val msgWithInvalidDomain = vesMessage(Domain.OTHER)
59 val msgWithInvalidFrame = invalidWireFrame()
60 val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
61 val expectedRefCnt = 0
63 val handledEvents = sut.handleConnection(
64 sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
66 assertThat(handledEvents).hasSize(1)
68 assertThat(validMessage.refCnt())
69 .describedAs("handled message should be released")
70 .isEqualTo(expectedRefCnt)
71 assertThat(msgWithInvalidDomain.refCnt())
72 .describedAs("message with invalid domain should be released")
73 .isEqualTo(expectedRefCnt)
74 assertThat(msgWithInvalidFrame.refCnt())
75 .describedAs("message with invalid frame should be released")
76 .isEqualTo(expectedRefCnt)
77 assertThat(msgWithTooBigPayload.refCnt())
78 .describedAs("message with payload exceeding 1MiB should be released")
79 .isEqualTo(expectedRefCnt)
83 it("should release memory for each message with invalid payload") {
84 val sink = StoringSink()
86 sut.configurationProvider.updateConfiguration(basicConfiguration)
87 val validMessage = vesMessage(Domain.HVRANMEAS)
88 val msgWithInvalidPayload = invalidVesMessage()
89 val expectedRefCnt = 0
91 val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
93 assertThat(handledEvents).hasSize(1)
95 assertThat(validMessage.refCnt())
96 .describedAs("handled message should be released")
97 .isEqualTo(expectedRefCnt)
98 assertThat(msgWithInvalidPayload.refCnt())
99 .describedAs("message with invalid payload should be released")
100 .isEqualTo(expectedRefCnt)
104 it("should release memory for each message with garbage frame") {
105 val sink = StoringSink()
107 sut.configurationProvider.updateConfiguration(basicConfiguration)
108 val validMessage = vesMessage(Domain.HVRANMEAS)
109 val msgWithGarbageFrame = garbageFrame()
110 val expectedRefCnt = 0
112 val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
114 assertThat(handledEvents).hasSize(1)
116 assertThat(validMessage.refCnt())
117 .describedAs("handled message should be released")
118 .isEqualTo(expectedRefCnt)
119 assertThat(msgWithGarbageFrame.refCnt())
120 .describedAs("message with garbage frame should be released")
121 .isEqualTo(expectedRefCnt)
126 describe("message routing") {
127 it("should direct message to a topic by means of routing configuration") {
128 val sink = StoringSink()
130 sut.configurationProvider.updateConfiguration(basicConfiguration)
132 val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
133 assertThat(messages).describedAs("number of routed messages").hasSize(1)
135 val msg = messages[0]
136 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
137 assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
140 it("should drop message if route was not found") {
141 val sink = StoringSink()
143 sut.configurationProvider.updateConfiguration(basicConfiguration)
144 val messages = sut.handleConnection(sink,
145 vesMessage(Domain.OTHER, "first"),
146 vesMessage(Domain.HVRANMEAS, "second"),
147 vesMessage(Domain.HEARTBEAT, "third"))
149 assertThat(messages).describedAs("number of routed messages").hasSize(1)
151 val msg = messages[0]
152 assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
153 assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
157 describe("request validation") {
158 it("should reject message with payload greater than 1 MiB and all subsequent messages") {
159 val sink = StoringSink()
161 sut.configurationProvider.updateConfiguration(basicConfiguration)
163 val handledMessages = sut.handleConnection(sink,
164 vesMessage(Domain.HVRANMEAS, "first"),
165 vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
166 vesMessage(Domain.HVRANMEAS, "third"))
168 assertThat(handledMessages).hasSize(1)
169 assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")