21c5c189d6f1978708fe1e7c2cf6eedc0cbecba1
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
37 import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
45 import reactor.core.publisher.Flux
46 import java.time.Duration
47
48 /**
49  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
50  * @since May 2018
51  */
52 object VesHvSpecification : Spek({
53     debugRx(false)
54
55     describe("VES High Volume Collector") {
56         it("should handle multiple HV RAN events") {
57             val (sut, sink) = vesHvWithStoringSink()
58             val messages = sut.handleConnection(sink,
59                     vesWireFrameMessage(PERF3GPP),
60                     vesWireFrameMessage(PERF3GPP)
61             )
62
63             assertThat(messages)
64                     .describedAs("should send all events")
65                     .hasSize(2)
66         }
67
68         it("should close sink when closing collector provider") {
69             val (sut, _) = vesHvWithStoringSink()
70
71             sut.close()
72
73             assertThat(sut.sinkProvider.closed).isTrue()
74         }
75     }
76
77     describe("Memory management") {
78         it("should release memory for each handled and dropped message") {
79             val (sut, sink) = vesHvWithStoringSink()
80             val validMessage = vesWireFrameMessage(PERF3GPP)
81             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
82             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
83             val expectedRefCnt = 0
84
85             val handledEvents = sut.handleConnection(
86                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
87
88             assertThat(handledEvents).hasSize(1)
89
90             assertThat(validMessage.refCnt())
91                     .describedAs("handled message should be released")
92                     .isEqualTo(expectedRefCnt)
93             assertThat(msgWithInvalidFrame.refCnt())
94                     .describedAs("message with invalid frame should be released")
95                     .isEqualTo(expectedRefCnt)
96             assertThat(msgWithTooBigPayload.refCnt())
97                     .describedAs("message with payload exceeding 1MiB should be released")
98                     .isEqualTo(expectedRefCnt)
99         }
100
101         it("should release memory for each message with invalid payload") {
102             val (sut, sink) = vesHvWithStoringSink()
103             val validMessage = vesWireFrameMessage(PERF3GPP)
104             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
105             val expectedRefCnt = 0
106
107             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
108
109             assertThat(handledEvents).hasSize(1)
110
111             assertThat(validMessage.refCnt())
112                     .describedAs("handled message should be released")
113                     .isEqualTo(expectedRefCnt)
114             assertThat(msgWithInvalidPayload.refCnt())
115                     .describedAs("message with invalid payload should be released")
116                     .isEqualTo(expectedRefCnt)
117
118         }
119
120         it("should release memory for each message with garbage frame") {
121             val (sut, sink) = vesHvWithStoringSink()
122             val validMessage = vesWireFrameMessage(PERF3GPP)
123             val msgWithGarbageFrame = garbageFrame()
124             val expectedRefCnt = 0
125
126             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
127
128             assertThat(handledEvents).hasSize(1)
129
130             assertThat(validMessage.refCnt())
131                     .describedAs("handled message should be released")
132                     .isEqualTo(expectedRefCnt)
133             assertThat(msgWithGarbageFrame.refCnt())
134                     .describedAs("message with garbage frame should be released")
135                     .isEqualTo(expectedRefCnt)
136
137         }
138     }
139
140     describe("message routing") {
141         it("should direct message to a topic by means of routing configuration") {
142             val (sut, sink) = vesHvWithStoringSink()
143
144             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
145             assertThat(messages).describedAs("number of routed messages").hasSize(1)
146
147             val msg = messages[0]
148             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
149             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
150         }
151
152         it("should be able to direct 2 messages from different domains to one topic") {
153             val (sut, sink) = vesHvWithStoringSink()
154
155             sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
156
157             val messages = sut.handleConnection(sink,
158                     vesWireFrameMessage(PERF3GPP),
159                     vesWireFrameMessage(HEARTBEAT),
160                     vesWireFrameMessage(MEASUREMENT))
161
162             assertThat(messages).describedAs("number of routed messages").hasSize(3)
163
164             assertThat(messages[0].topic).describedAs("first message topic")
165                     .isEqualTo(PERF3GPP_TOPIC)
166
167             assertThat(messages[1].topic).describedAs("second message topic")
168                     .isEqualTo(PERF3GPP_TOPIC)
169
170             assertThat(messages[2].topic).describedAs("last message topic")
171                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
172         }
173
174         it("should drop message if route was not found") {
175             val (sut, sink) = vesHvWithStoringSink()
176             val messages = sut.handleConnection(sink,
177                     vesWireFrameMessage(OTHER, "first"),
178                     vesWireFrameMessage(PERF3GPP, "second"),
179                     vesWireFrameMessage(HEARTBEAT, "third"))
180
181             assertThat(messages).describedAs("number of routed messages").hasSize(1)
182
183             val msg = messages[0]
184             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
185             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
186         }
187     }
188
189     describe("configuration update") {
190
191         val defaultTimeout = Duration.ofSeconds(10)
192
193         given("successful configuration change") {
194
195             lateinit var sut: Sut
196             lateinit var sink: StoringSink
197
198             beforeEachTest {
199                 vesHvWithStoringSink().run {
200                     sut = first
201                     sink = second
202                 }
203             }
204
205             it("should update collector") {
206                 val firstCollector = sut.collector
207
208                 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
209                 val collectorAfterUpdate = sut.collector
210
211                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
212             }
213
214             it("should start routing messages") {
215
216                 sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
217
218                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
219                 assertThat(messages).isEmpty()
220
221                 sut.configurationProvider.updateConfiguration(configWithBasicRouting)
222
223                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
224                 assertThat(messagesAfterUpdate).hasSize(1)
225                 val message = messagesAfterUpdate[0]
226
227                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
228                         .isEqualTo(PERF3GPP_TOPIC)
229                 assertThat(message.partition).describedAs("routed message partition")
230                         .isEqualTo(0)
231             }
232
233             it("should change domain routing") {
234
235                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
236                 assertThat(messages).hasSize(1)
237                 val firstMessage = messages[0]
238
239                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
240                         .isEqualTo(PERF3GPP_TOPIC)
241                 assertThat(firstMessage.partition).describedAs("routed message partition")
242                         .isEqualTo(0)
243
244
245                 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
246
247                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
248                 assertThat(messagesAfterUpdate).hasSize(2)
249                 val secondMessage = messagesAfterUpdate[1]
250
251                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
252                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
253                 assertThat(secondMessage.partition).describedAs("routed message partition")
254                         .isEqualTo(0)
255             }
256
257             it("should update routing for each client sending one message") {
258
259                 val messagesAmount = 10
260                 val messagesForEachTopic = 5
261
262                 Flux.range(0, messagesAmount).doOnNext {
263                     if (it == messagesForEachTopic) {
264                         sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
265                     }
266                 }.doOnNext {
267                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
268                 }.then().block(defaultTimeout)
269
270
271                 val messages = sink.sentMessages
272                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
273                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
274
275                 assertThat(messages.size).isEqualTo(messagesAmount)
276                 assertThat(messagesForEachTopic)
277                         .describedAs("amount of messages routed to each topic")
278                         .isEqualTo(firstTopicMessagesCount)
279                         .isEqualTo(secondTopicMessagesCount)
280             }
281
282             it("should not update routing for client sending continuous stream of messages") {
283
284                 val messageStreamSize = 10
285                 val pivot = 5
286
287                 val incomingMessages = Flux.range(0, messageStreamSize)
288                         .doOnNext {
289                             if (it == pivot) {
290                                 sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
291                                 println("config changed")
292                             }
293                         }
294                         .map { vesWireFrameMessage(PERF3GPP) }
295
296
297                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
298
299                 val messages = sink.sentMessages
300                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
301                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
302
303                 assertThat(messages.size).isEqualTo(messageStreamSize)
304                 assertThat(firstTopicMessagesCount)
305                         .describedAs("amount of messages routed to first topic")
306                         .isEqualTo(messageStreamSize)
307
308                 assertThat(secondTopicMessagesCount)
309                         .describedAs("amount of messages routed to second topic")
310                         .isEqualTo(0)
311             }
312
313             it("should mark the application healthy") {
314                 assertThat(sut.healthStateProvider.currentHealth)
315                         .describedAs("application health state")
316                         .isEqualTo(HealthDescription.HEALTHY)
317             }
318         }
319
320         given("failed configuration change") {
321             val (sut, _) = vesHvWithStoringSink()
322             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
323             sut.configurationProvider.updateConfiguration(configWithBasicRouting)
324
325             it("should mark the application unhealthy ") {
326                 assertThat(sut.healthStateProvider.currentHealth)
327                         .describedAs("application health state")
328                         .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
329             }
330         }
331     }
332
333     describe("request validation") {
334         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
335             val (sut, sink) = vesHvWithStoringSink()
336
337             val handledMessages = sut.handleConnection(sink,
338                     vesWireFrameMessage(PERF3GPP, "first"),
339                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
340                     vesWireFrameMessage(PERF3GPP))
341
342             assertThat(handledMessages).hasSize(1)
343             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
344         }
345     }
346
347 })
348
349 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
350     val sink = StoringSink()
351     val sut = Sut(sink)
352     sut.configurationProvider.updateConfiguration(configWithBasicRouting)
353     return Pair(sut, sink)
354 }