5d215fc5ae3634c0c63bde0a8948076ecb36c79d
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018-2019 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import arrow.core.None
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.given
27 import org.jetbrains.spek.api.dsl.it
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
31 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
32 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
33 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
37 import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
42 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
45 import reactor.core.publisher.Flux
46 import java.time.Duration
47
48 /**
49  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
50  * @since May 2018
51  */
52 object VesHvSpecification : Spek({
53     debugRx(false)
54
55     describe("VES High Volume Collector") {
56         it("should handle multiple HV RAN events") {
57             val (sut, sink) = vesHvWithStoringSink()
58             val messages = sut.handleConnection(sink,
59                     vesWireFrameMessage(PERF3GPP),
60                     vesWireFrameMessage(PERF3GPP)
61             )
62
63             assertThat(messages)
64                     .describedAs("should send all events")
65                     .hasSize(2)
66         }
67
68         it("should create sink lazily") {
69             val (sut, sink) = vesHvWithStoringSink()
70
71             // just connecting should not create sink
72             sut.handleConnection()
73             sut.close().unsafeRunSync()
74
75             // then
76             assertThat(sink.closed).isFalse()
77         }
78
79         it("should close sink when closing collector provider") {
80             val (sut, sink) = vesHvWithStoringSink()
81             // given Sink initialized
82             // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
83             sut.handleConnection(vesWireFrameMessage(PERF3GPP))
84
85             // when
86             sut.close().unsafeRunSync()
87
88             // then
89             assertThat(sink.closed).isTrue()
90         }
91     }
92
93     describe("Memory management") {
94         it("should release memory for each handled and dropped message") {
95             val (sut, sink) = vesHvWithStoringSink()
96             val validMessage = vesWireFrameMessage(PERF3GPP)
97             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
98             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
99             val expectedRefCnt = 0
100
101             val handledEvents = sut.handleConnection(
102                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
103
104             assertThat(handledEvents).hasSize(1)
105
106             assertThat(validMessage.refCnt())
107                     .describedAs("handled message should be released")
108                     .isEqualTo(expectedRefCnt)
109             assertThat(msgWithInvalidFrame.refCnt())
110                     .describedAs("message with invalid frame should be released")
111                     .isEqualTo(expectedRefCnt)
112             assertThat(msgWithTooBigPayload.refCnt())
113                     .describedAs("message with payload exceeding 1MiB should be released")
114                     .isEqualTo(expectedRefCnt)
115         }
116
117         it("should release memory for each message with invalid payload") {
118             val (sut, sink) = vesHvWithStoringSink()
119             val validMessage = vesWireFrameMessage(PERF3GPP)
120             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
121             val expectedRefCnt = 0
122
123             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
124
125             assertThat(handledEvents).hasSize(1)
126
127             assertThat(validMessage.refCnt())
128                     .describedAs("handled message should be released")
129                     .isEqualTo(expectedRefCnt)
130             assertThat(msgWithInvalidPayload.refCnt())
131                     .describedAs("message with invalid payload should be released")
132                     .isEqualTo(expectedRefCnt)
133
134         }
135
136         it("should release memory for each message with garbage frame") {
137             val (sut, sink) = vesHvWithStoringSink()
138             val validMessage = vesWireFrameMessage(PERF3GPP)
139             val msgWithGarbageFrame = garbageFrame()
140             val expectedRefCnt = 0
141
142             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
143
144             assertThat(handledEvents).hasSize(1)
145
146             assertThat(validMessage.refCnt())
147                     .describedAs("handled message should be released")
148                     .isEqualTo(expectedRefCnt)
149             assertThat(msgWithGarbageFrame.refCnt())
150                     .describedAs("message with garbage frame should be released")
151                     .isEqualTo(expectedRefCnt)
152
153         }
154     }
155
156     describe("message routing") {
157         it("should direct message to a topic by means of routing configuration") {
158             val (sut, sink) = vesHvWithStoringSink()
159
160             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
161             assertThat(messages).describedAs("number of routed messages").hasSize(1)
162
163             val msg = messages[0]
164             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
165             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
166         }
167
168         it("should be able to direct 2 messages from different domains to one topic") {
169             val (sut, sink) = vesHvWithStoringSink()
170
171             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
172
173             val messages = sut.handleConnection(sink,
174                     vesWireFrameMessage(PERF3GPP),
175                     vesWireFrameMessage(HEARTBEAT),
176                     vesWireFrameMessage(MEASUREMENT))
177
178             assertThat(messages).describedAs("number of routed messages").hasSize(3)
179
180             assertThat(messages[0].targetTopic).describedAs("first message topic")
181                     .isEqualTo(PERF3GPP_TOPIC)
182
183             assertThat(messages[1].targetTopic).describedAs("second message topic")
184                     .isEqualTo(PERF3GPP_TOPIC)
185
186             assertThat(messages[2].targetTopic).describedAs("last message topic")
187                     .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
188         }
189
190         it("should drop message if route was not found") {
191             val (sut, sink) = vesHvWithStoringSink()
192             val messages = sut.handleConnection(sink,
193                     vesWireFrameMessage(OTHER, "first"),
194                     vesWireFrameMessage(PERF3GPP, "second"),
195                     vesWireFrameMessage(HEARTBEAT, "third"))
196
197             assertThat(messages).describedAs("number of routed messages").hasSize(1)
198
199             val msg = messages[0]
200             assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
201             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
202         }
203     }
204
205     describe("configuration update") {
206
207         val defaultTimeout = Duration.ofSeconds(10)
208
209         given("successful configuration change") {
210
211             lateinit var sut: Sut
212             lateinit var sink: StoringSink
213
214             beforeEachTest {
215                 vesHvWithStoringSink().run {
216                     sut = first
217                     sink = second
218                 }
219             }
220
221             it("should update collector") {
222                 val firstCollector = sut.collector
223
224                 sut.configurationProvider.updateConfiguration(alternativeRouting)
225                 val collectorAfterUpdate = sut.collector
226
227                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
228             }
229
230             it("should start routing messages") {
231
232                 sut.configurationProvider.updateConfiguration(emptyRouting)
233
234                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
235                 assertThat(messages).isEmpty()
236
237                 sut.configurationProvider.updateConfiguration(basicRouting)
238
239                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
240                 assertThat(messagesAfterUpdate).hasSize(1)
241                 val message = messagesAfterUpdate[0]
242
243                 assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
244                         .isEqualTo(PERF3GPP_TOPIC)
245                 assertThat(message.partition).describedAs("routed message partition")
246                         .isEqualTo(None)
247             }
248
249             it("should change domain routing") {
250
251                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
252                 assertThat(messages).hasSize(1)
253                 val firstMessage = messages[0]
254
255                 assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
256                         .isEqualTo(PERF3GPP_TOPIC)
257                 assertThat(firstMessage.partition).describedAs("routed message partition")
258                         .isEqualTo(None)
259
260
261                 sut.configurationProvider.updateConfiguration(alternativeRouting)
262
263                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
264                 assertThat(messagesAfterUpdate).hasSize(2)
265                 val secondMessage = messagesAfterUpdate[1]
266
267                 assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
268                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
269                 assertThat(secondMessage.partition).describedAs("routed message partition")
270                         .isEqualTo(None)
271             }
272
273             it("should update routing for each client sending one message") {
274
275                 val messagesAmount = 10
276                 val messagesForEachTopic = 5
277
278                 Flux.range(0, messagesAmount).doOnNext {
279                     if (it == messagesForEachTopic) {
280                         sut.configurationProvider.updateConfiguration(alternativeRouting)
281                     }
282                 }.doOnNext {
283                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
284                 }.then().block(defaultTimeout)
285
286
287                 val messages = sink.sentMessages
288                 val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
289                 val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
290
291                 assertThat(messages.size).isEqualTo(messagesAmount)
292                 assertThat(messagesForEachTopic)
293                         .describedAs("amount of messages routed to each topic")
294                         .isEqualTo(firstTopicMessagesCount)
295                         .isEqualTo(secondTopicMessagesCount)
296             }
297
298             it("should not update routing for client sending continuous stream of messages") {
299
300                 val messageStreamSize = 10
301                 val pivot = 5
302
303                 val incomingMessages = Flux.range(0, messageStreamSize)
304                         .doOnNext {
305                             if (it == pivot) {
306                                 sut.configurationProvider.updateConfiguration(alternativeRouting)
307                                 println("config changed")
308                             }
309                         }
310                         .map { vesWireFrameMessage(PERF3GPP) }
311
312
313                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
314
315                 val messages = sink.sentMessages
316                 val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
317                 val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
318
319                 assertThat(messages.size).isEqualTo(messageStreamSize)
320                 assertThat(firstTopicMessagesCount)
321                         .describedAs("amount of messages routed to first topic")
322                         .isEqualTo(messageStreamSize)
323
324                 assertThat(secondTopicMessagesCount)
325                         .describedAs("amount of messages routed to second topic")
326                         .isEqualTo(0)
327             }
328
329             it("should mark the application healthy") {
330                 assertThat(sut.healthStateProvider.currentHealth)
331                         .describedAs("application health state")
332                         .isEqualTo(HealthDescription.HEALTHY)
333             }
334         }
335
336         given("failed configuration change") {
337             val (sut, _) = vesHvWithStoringSink()
338             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
339             sut.configurationProvider.updateConfiguration(basicRouting)
340
341             it("should mark the application unhealthy ") {
342                 assertThat(sut.healthStateProvider.currentHealth)
343                         .describedAs("application health state")
344                         .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
345             }
346         }
347     }
348
349     describe("request validation") {
350         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
351             val (sut, sink) = vesHvWithStoringSink()
352
353             val handledMessages = sut.handleConnection(sink,
354                     vesWireFrameMessage(PERF3GPP, "first"),
355                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
356                     vesWireFrameMessage(PERF3GPP))
357
358             assertThat(handledMessages).hasSize(1)
359             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
360         }
361     }
362
363 })
364
365 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
366     val sink = StoringSink()
367     val sut = Sut(sink)
368     sut.configurationProvider.updateConfiguration(basicRouting)
369     return Pair(sut, sink)
370 }