d845f7c4d4c75be6b52afae51f29fb0c917db4dc
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
28 import org.onap.dcae.collectors.veshv.config.api.model.Routing
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
33 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
34 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
36 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
37 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
39 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
40 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
42 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
43 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
44
45 /**
46  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47  * @since May 2018
48  */
49 object VesHvSpecification : Spek({
50     debugRx(false)
51
52     describe("VES High Volume Collector") {
53         it("should handle multiple HV RAN events") {
54             val (sut, sink) = vesHvWithStoringSink()
55             val messages = sut.handleConnection(sink,
56                     vesWireFrameMessage(PERF3GPP),
57                     vesWireFrameMessage(PERF3GPP)
58             )
59
60             assertThat(messages)
61                     .describedAs("should send all events")
62                     .hasSize(2)
63         }
64
65         it("should create sink lazily") {
66             val (sut, sink) = vesHvWithStoringSink()
67
68             // just connecting should not create sink
69             sut.handleConnection()
70             sut.close().block()
71
72             // then
73             assertThat(sink.closed).isFalse()
74         }
75
76         it("should close sink when closing collector provider") {
77             val (sut, sink) = vesHvWithStoringSink()
78             // given Sink initialized
79             // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
80             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
81
82             // when
83             sut.close().block()
84
85             // then
86             assertThat(sink.closed).isTrue()
87         }
88     }
89
90     describe("Memory management") {
91         it("should release memory for each handled and dropped message") {
92             val (sut, sink) = vesHvWithStoringSink()
93             val validMessage = vesWireFrameMessage(PERF3GPP)
94             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
95             val msgWithTooBigPayload = messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
96             val expectedRefCnt = 0
97
98             val handledEvents = sut.handleConnection(
99                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
100
101             assertThat(handledEvents).hasSize(1)
102
103             assertThat(validMessage.refCnt())
104                     .describedAs("handled message should be released")
105                     .isEqualTo(expectedRefCnt)
106             assertThat(msgWithInvalidFrame.refCnt())
107                     .describedAs("message with invalid frame should be released")
108                     .isEqualTo(expectedRefCnt)
109             assertThat(msgWithTooBigPayload.refCnt())
110                     .describedAs("message with payload exceeding 1MiB should be released")
111                     .isEqualTo(expectedRefCnt)
112         }
113
114         it("should release memory for each message with invalid payload") {
115             val (sut, sink) = vesHvWithStoringSink()
116             val validMessage = vesWireFrameMessage(PERF3GPP)
117             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
118             val expectedRefCnt = 0
119
120             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
121
122             assertThat(handledEvents).hasSize(1)
123
124             assertThat(validMessage.refCnt())
125                     .describedAs("handled message should be released")
126                     .isEqualTo(expectedRefCnt)
127             assertThat(msgWithInvalidPayload.refCnt())
128                     .describedAs("message with invalid payload should be released")
129                     .isEqualTo(expectedRefCnt)
130
131         }
132
133         it("should release memory for each message with garbage frame") {
134             val (sut, sink) = vesHvWithStoringSink()
135             val validMessage = vesWireFrameMessage(PERF3GPP)
136             val msgWithGarbageFrame = garbageFrame()
137             val expectedRefCnt = 0
138
139             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
140
141             assertThat(handledEvents).hasSize(1)
142
143             assertThat(validMessage.refCnt())
144                     .describedAs("handled message should be released")
145                     .isEqualTo(expectedRefCnt)
146             assertThat(msgWithGarbageFrame.refCnt())
147                     .describedAs("message with garbage frame should be released")
148                     .isEqualTo(expectedRefCnt)
149
150         }
151     }
152
153     describe("message routing") {
154         it("should direct message to a topic by means of routing configuration") {
155             val (sut, sink) = vesHvWithStoringSink()
156
157             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
158             assertThat(messages).describedAs("number of routed messages").hasSize(1)
159
160             val msg = messages[0]
161             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
162             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
163         }
164
165         it("should be able to direct 2 messages from different domains to one topic") {
166             val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
167
168             val messages = sut.handleConnection(sink,
169                     vesWireFrameMessage(PERF3GPP),
170                     vesWireFrameMessage(HEARTBEAT),
171                     vesWireFrameMessage(MEASUREMENT))
172
173             assertThat(messages).describedAs("number of routed messages").hasSize(3)
174
175             assertThat(messages[0].targetTopic).describedAs("first message topic")
176                     .isEqualTo(PERF3GPP_TOPIC)
177
178             assertThat(messages[1].targetTopic).describedAs("second message topic")
179                     .isEqualTo(PERF3GPP_TOPIC)
180
181             assertThat(messages[2].targetTopic).describedAs("last message topic")
182                     .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
183         }
184
185         it("should drop message if route was not found") {
186             val (sut, sink) = vesHvWithStoringSink()
187             val messages = sut.handleConnection(sink,
188                     vesWireFrameMessage(OTHER, "first"),
189                     vesWireFrameMessage(PERF3GPP, "second"),
190                     vesWireFrameMessage(HEARTBEAT, "third"))
191
192             assertThat(messages).describedAs("number of routed messages").hasSize(1)
193
194             val msg = messages[0]
195             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
196             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
197         }
198     }
199
200     describe("request validation") {
201         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
202             val (sut, sink) = vesHvWithStoringSink()
203
204             val handledMessages = sut.handleConnection(sink,
205                     vesWireFrameMessage(PERF3GPP, "first"),
206                     messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
207                     vesWireFrameMessage(PERF3GPP))
208
209             assertThat(handledMessages).hasSize(1)
210             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
211         }
212     }
213
214 })
215
216 private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
217     val sink = StoringSink()
218     val sut = Sut(CollectorConfiguration(routing), sink)
219     return Pair(sut, sink)
220 }