6a718eeae81a30ad8848200a48edb2927aab3f2d
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
29 import org.onap.dcae.collectors.veshv.config.api.model.Routing
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
32 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
33 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
34 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
35 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
36 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
37 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
38 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
40 import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
41 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
42 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
43 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
44 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
45 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
46 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
47 import reactor.core.publisher.Flux
48 import java.time.Duration
49
50 /**
51  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52  * @since May 2018
53  */
54 object VesHvSpecification : Spek({
55     debugRx(false)
56
57     describe("VES High Volume Collector") {
58         it("should handle multiple HV RAN events") {
59             val (sut, sink) = vesHvWithStoringSink()
60             val messages = sut.handleConnection(sink,
61                     vesWireFrameMessage(PERF3GPP),
62                     vesWireFrameMessage(PERF3GPP)
63             )
64
65             assertThat(messages)
66                     .describedAs("should send all events")
67                     .hasSize(2)
68         }
69
70         it("should create sink lazily") {
71             val (sut, sink) = vesHvWithStoringSink()
72
73             // just connecting should not create sink
74             sut.handleConnection()
75             sut.close().unsafeRunSync()
76
77             // then
78             assertThat(sink.closed).isFalse()
79         }
80
81         it("should close sink when closing collector provider") {
82             val (sut, sink) = vesHvWithStoringSink()
83             // given Sink initialized
84             // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
85             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
86
87             // when
88             sut.close().unsafeRunSync()
89
90             // then
91             assertThat(sink.closed).isTrue()
92         }
93     }
94
95     describe("Memory management") {
96         it("should release memory for each handled and dropped message") {
97             val (sut, sink) = vesHvWithStoringSink()
98             val validMessage = vesWireFrameMessage(PERF3GPP)
99             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
100             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
101             val expectedRefCnt = 0
102
103             val handledEvents = sut.handleConnection(
104                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
105
106             assertThat(handledEvents).hasSize(1)
107
108             assertThat(validMessage.refCnt())
109                     .describedAs("handled message should be released")
110                     .isEqualTo(expectedRefCnt)
111             assertThat(msgWithInvalidFrame.refCnt())
112                     .describedAs("message with invalid frame should be released")
113                     .isEqualTo(expectedRefCnt)
114             assertThat(msgWithTooBigPayload.refCnt())
115                     .describedAs("message with payload exceeding 1MiB should be released")
116                     .isEqualTo(expectedRefCnt)
117         }
118
119         it("should release memory for each message with invalid payload") {
120             val (sut, sink) = vesHvWithStoringSink()
121             val validMessage = vesWireFrameMessage(PERF3GPP)
122             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
123             val expectedRefCnt = 0
124
125             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
126
127             assertThat(handledEvents).hasSize(1)
128
129             assertThat(validMessage.refCnt())
130                     .describedAs("handled message should be released")
131                     .isEqualTo(expectedRefCnt)
132             assertThat(msgWithInvalidPayload.refCnt())
133                     .describedAs("message with invalid payload should be released")
134                     .isEqualTo(expectedRefCnt)
135
136         }
137
138         it("should release memory for each message with garbage frame") {
139             val (sut, sink) = vesHvWithStoringSink()
140             val validMessage = vesWireFrameMessage(PERF3GPP)
141             val msgWithGarbageFrame = garbageFrame()
142             val expectedRefCnt = 0
143
144             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
145
146             assertThat(handledEvents).hasSize(1)
147
148             assertThat(validMessage.refCnt())
149                     .describedAs("handled message should be released")
150                     .isEqualTo(expectedRefCnt)
151             assertThat(msgWithGarbageFrame.refCnt())
152                     .describedAs("message with garbage frame should be released")
153                     .isEqualTo(expectedRefCnt)
154
155         }
156     }
157
158     describe("message routing") {
159         it("should direct message to a topic by means of routing configuration") {
160             val (sut, sink) = vesHvWithStoringSink()
161
162             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
163             assertThat(messages).describedAs("number of routed messages").hasSize(1)
164
165             val msg = messages[0]
166             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
167             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
168         }
169
170         it("should be able to direct 2 messages from different domains to one topic") {
171             val (sut, sink) = vesHvWithStoringSink(twoDomainsToOneTopicRouting)
172
173             val messages = sut.handleConnection(sink,
174                     vesWireFrameMessage(PERF3GPP),
175                     vesWireFrameMessage(HEARTBEAT),
176                     vesWireFrameMessage(MEASUREMENT))
177
178             assertThat(messages).describedAs("number of routed messages").hasSize(3)
179
180             assertThat(messages[0].targetTopic).describedAs("first message topic")
181                     .isEqualTo(PERF3GPP_TOPIC)
182
183             assertThat(messages[1].targetTopic).describedAs("second message topic")
184                     .isEqualTo(PERF3GPP_TOPIC)
185
186             assertThat(messages[2].targetTopic).describedAs("last message topic")
187                     .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
188         }
189
190         it("should drop message if route was not found") {
191             val (sut, sink) = vesHvWithStoringSink()
192             val messages = sut.handleConnection(sink,
193                     vesWireFrameMessage(OTHER, "first"),
194                     vesWireFrameMessage(PERF3GPP, "second"),
195                     vesWireFrameMessage(HEARTBEAT, "third"))
196
197             assertThat(messages).describedAs("number of routed messages").hasSize(1)
198
199             val msg = messages[0]
200             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
201             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
202         }
203     }
204
205     describe("request validation") {
206         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
207             val (sut, sink) = vesHvWithStoringSink()
208
209             val handledMessages = sut.handleConnection(sink,
210                     vesWireFrameMessage(PERF3GPP, "first"),
211                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
212                     vesWireFrameMessage(PERF3GPP))
213
214             assertThat(handledMessages).hasSize(1)
215             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
216         }
217     }
218
219 })
220
221 private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
222     val sink = StoringSink()
223     val sut = Sut(CollectorConfiguration(routing), sink)
224     return Pair(sut, sink)
225 }