ed46b11972d197c75e8b4a3a5c70bf377ec89290
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.*
41
42 import reactor.core.publisher.Flux
43 import java.time.Duration
44
45 /**
46  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47  * @since May 2018
48  */
49 object VesHvSpecification : Spek({
50     debugRx(false)
51
52     describe("VES High Volume Collector") {
53         it("should handle multiple HV RAN events") {
54             val (sut, sink) = vesHvWithStoringSink()
55             val messages = sut.handleConnection(sink,
56                     vesWireFrameMessage(PERF3GPP),
57                     vesWireFrameMessage(PERF3GPP)
58             )
59
60             assertThat(messages)
61                     .describedAs("should send all events")
62                     .hasSize(2)
63         }
64
65         it("should close sink when closing collector provider") {
66             val (sut, _) = vesHvWithStoringSink()
67
68             sut.close()
69
70             assertThat(sut.sinkProvider.closed).isTrue()
71         }
72     }
73
74     describe("Memory management") {
75         it("should release memory for each handled and dropped message") {
76             val (sut, sink) = vesHvWithStoringSink()
77             val validMessage = vesWireFrameMessage(PERF3GPP)
78             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
79             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
80             val expectedRefCnt = 0
81
82             val handledEvents = sut.handleConnection(
83                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
84
85             assertThat(handledEvents).hasSize(1)
86
87             assertThat(validMessage.refCnt())
88                     .describedAs("handled message should be released")
89                     .isEqualTo(expectedRefCnt)
90             assertThat(msgWithInvalidFrame.refCnt())
91                     .describedAs("message with invalid frame should be released")
92                     .isEqualTo(expectedRefCnt)
93             assertThat(msgWithTooBigPayload.refCnt())
94                     .describedAs("message with payload exceeding 1MiB should be released")
95                     .isEqualTo(expectedRefCnt)
96         }
97
98         it("should release memory for each message with invalid payload") {
99             val (sut, sink) = vesHvWithStoringSink()
100             val validMessage = vesWireFrameMessage(PERF3GPP)
101             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
102             val expectedRefCnt = 0
103
104             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
105
106             assertThat(handledEvents).hasSize(1)
107
108             assertThat(validMessage.refCnt())
109                     .describedAs("handled message should be released")
110                     .isEqualTo(expectedRefCnt)
111             assertThat(msgWithInvalidPayload.refCnt())
112                     .describedAs("message with invalid payload should be released")
113                     .isEqualTo(expectedRefCnt)
114
115         }
116
117         it("should release memory for each message with garbage frame") {
118             val (sut, sink) = vesHvWithStoringSink()
119             val validMessage = vesWireFrameMessage(PERF3GPP)
120             val msgWithGarbageFrame = garbageFrame()
121             val expectedRefCnt = 0
122
123             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
124
125             assertThat(handledEvents).hasSize(1)
126
127             assertThat(validMessage.refCnt())
128                     .describedAs("handled message should be released")
129                     .isEqualTo(expectedRefCnt)
130             assertThat(msgWithGarbageFrame.refCnt())
131                     .describedAs("message with garbage frame should be released")
132                     .isEqualTo(expectedRefCnt)
133
134         }
135     }
136
137     describe("message routing") {
138         it("should direct message to a topic by means of routing configuration") {
139             val (sut, sink) = vesHvWithStoringSink()
140
141             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
142             assertThat(messages).describedAs("number of routed messages").hasSize(1)
143
144             val msg = messages[0]
145             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
146             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
147         }
148
149         it("should be able to direct 2 messages from different domains to one topic") {
150             val (sut, sink) = vesHvWithStoringSink()
151
152             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
153
154             val messages = sut.handleConnection(sink,
155                     vesWireFrameMessage(PERF3GPP),
156                     vesWireFrameMessage(HEARTBEAT),
157                     vesWireFrameMessage(MEASUREMENT))
158
159             assertThat(messages).describedAs("number of routed messages").hasSize(3)
160
161             assertThat(messages[0].topic).describedAs("first message topic")
162                     .isEqualTo(PERF3GPP_TOPIC)
163
164             assertThat(messages[1].topic).describedAs("second message topic")
165                     .isEqualTo(PERF3GPP_TOPIC)
166
167             assertThat(messages[2].topic).describedAs("last message topic")
168                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
169         }
170
171         it("should drop message if route was not found") {
172             val (sut, sink) = vesHvWithStoringSink()
173             val messages = sut.handleConnection(sink,
174                     vesWireFrameMessage(OTHER, "first"),
175                     vesWireFrameMessage(PERF3GPP, "second"),
176                     vesWireFrameMessage(HEARTBEAT, "third"))
177
178             assertThat(messages).describedAs("number of routed messages").hasSize(1)
179
180             val msg = messages[0]
181             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
182             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
183         }
184     }
185
186     describe("configuration update") {
187
188         val defaultTimeout = Duration.ofSeconds(10)
189
190         given("successful configuration change") {
191
192             lateinit var sut: Sut
193             lateinit var sink: StoringSink
194
195             beforeEachTest {
196                 vesHvWithStoringSink().run {
197                     sut = first
198                     sink = second
199                 }
200             }
201
202             it("should update collector") {
203                 val firstCollector = sut.collector
204
205                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
206                 val collectorAfterUpdate = sut.collector
207
208                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
209             }
210
211             it("should start routing messages") {
212
213                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
214
215                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
216                 assertThat(messages).isEmpty()
217
218                 sut.configurationProvider.updateConfiguration(basicConfiguration)
219
220                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
221                 assertThat(messagesAfterUpdate).hasSize(1)
222                 val message = messagesAfterUpdate[0]
223
224                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
225                         .isEqualTo(PERF3GPP_TOPIC)
226                 assertThat(message.partition).describedAs("routed message partition")
227                         .isEqualTo(0)
228             }
229
230             it("should change domain routing") {
231
232                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
233                 assertThat(messages).hasSize(1)
234                 val firstMessage = messages[0]
235
236                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
237                         .isEqualTo(PERF3GPP_TOPIC)
238                 assertThat(firstMessage.partition).describedAs("routed message partition")
239                         .isEqualTo(0)
240
241
242                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
243
244                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
245                 assertThat(messagesAfterUpdate).hasSize(2)
246                 val secondMessage = messagesAfterUpdate[1]
247
248                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
249                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
250                 assertThat(secondMessage.partition).describedAs("routed message partition")
251                         .isEqualTo(0)
252             }
253
254             it("should update routing for each client sending one message") {
255
256                 val messagesAmount = 10
257                 val messagesForEachTopic = 5
258
259                 Flux.range(0, messagesAmount).doOnNext {
260                     if (it == messagesForEachTopic) {
261                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
262                     }
263                 }.doOnNext {
264                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
265                 }.then().block(defaultTimeout)
266
267
268                 val messages = sink.sentMessages
269                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
270                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
271
272                 assertThat(messages.size).isEqualTo(messagesAmount)
273                 assertThat(messagesForEachTopic)
274                         .describedAs("amount of messages routed to each topic")
275                         .isEqualTo(firstTopicMessagesCount)
276                         .isEqualTo(secondTopicMessagesCount)
277             }
278
279             it("should not update routing for client sending continuous stream of messages") {
280
281                 val messageStreamSize = 10
282                 val pivot = 5
283
284                 val incomingMessages = Flux.range(0, messageStreamSize)
285                         .doOnNext {
286                             if (it == pivot) {
287                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
288                                 println("config changed")
289                             }
290                         }
291                         .map { vesWireFrameMessage(PERF3GPP) }
292
293
294                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
295
296                 val messages = sink.sentMessages
297                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
298                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
299
300                 assertThat(messages.size).isEqualTo(messageStreamSize)
301                 assertThat(firstTopicMessagesCount)
302                         .describedAs("amount of messages routed to first topic")
303                         .isEqualTo(messageStreamSize)
304
305                 assertThat(secondTopicMessagesCount)
306                         .describedAs("amount of messages routed to second topic")
307                         .isEqualTo(0)
308             }
309
310             it("should mark the application healthy") {
311                 assertThat(sut.healthStateProvider.currentHealth)
312                         .describedAs("application health state")
313                         .isEqualTo(HealthDescription.HEALTHY)
314             }
315         }
316
317         given("failed configuration change") {
318             val (sut, _) = vesHvWithStoringSink()
319             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
320             sut.configurationProvider.updateConfiguration(basicConfiguration)
321
322             it("should mark the application unhealthy ") {
323                 assertThat(sut.healthStateProvider.currentHealth)
324                         .describedAs("application health state")
325                         .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
326             }
327         }
328     }
329
330     describe("request validation") {
331         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
332             val (sut, sink) = vesHvWithStoringSink()
333
334             val handledMessages = sut.handleConnection(sink,
335                     vesWireFrameMessage(PERF3GPP, "first"),
336                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
337                     vesWireFrameMessage(PERF3GPP))
338
339             assertThat(handledMessages).hasSize(1)
340             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
341         }
342     }
343
344 })
345
346 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
347     val sink = StoringSink()
348     val sut = Sut(sink)
349     sut.configurationProvider.updateConfiguration(basicConfiguration)
350     return Pair(sut, sink)
351 }