1f07c2336da7c5e9e9fba088b3fc6386bf06e2c3
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
28 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
29 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
30 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
31 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
32 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
33 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
34 import reactor.core.publisher.Flux
35 import java.time.Duration
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since May 2018
40  */
41 object VesHvSpecification : Spek({
42     debugRx(false)
43
44     describe("VES High Volume Collector") {
45         it("should handle multiple HV RAN events") {
46             val (sut, sink) = vesHvWithStoringSink()
47             val messages = sut.handleConnection(sink,
48                     vesWireFrameMessage(Domain.HVRANMEAS),
49                     vesWireFrameMessage(Domain.HVRANMEAS)
50             )
51
52             assertThat(messages)
53                     .describedAs("should send all events")
54                     .hasSize(2)
55         }
56
57         it("should not handle messages received from client after end-of-transmission message") {
58             val (sut, sink) = vesHvWithStoringSink()
59             val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
60             val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
61             val endOfTransmissionMessage = endOfTransmissionWireMessage()
62
63             val handledEvents = sut.handleConnection(sink,
64                     validMessage,
65                     endOfTransmissionMessage,
66                     anotherValidMessage
67             )
68
69             assertThat(handledEvents).hasSize(1)
70             assertThat(validMessage.refCnt())
71                     .describedAs("first message should be released")
72                     .isEqualTo(0)
73             assertThat(endOfTransmissionMessage.refCnt())
74                     .describedAs("end-of-transmission message should be released")
75                     .isEqualTo(0)
76             assertThat(anotherValidMessage.refCnt())
77                     .describedAs("second (not handled) message should not be released")
78                     .isEqualTo(1)
79         }
80     }
81
82     describe("Memory management") {
83         it("should release memory for each handled and dropped message") {
84             val (sut, sink) = vesHvWithStoringSink()
85             val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
86             val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
87             val msgWithInvalidFrame = invalidWireFrame()
88             val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
89             val expectedRefCnt = 0
90
91             val handledEvents = sut.handleConnection(
92                     sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
93
94             assertThat(handledEvents).hasSize(1)
95
96             assertThat(validMessage.refCnt())
97                     .describedAs("handled message should be released")
98                     .isEqualTo(expectedRefCnt)
99             assertThat(msgWithInvalidDomain.refCnt())
100                     .describedAs("message with invalid domain should be released")
101                     .isEqualTo(expectedRefCnt)
102             assertThat(msgWithInvalidFrame.refCnt())
103                     .describedAs("message with invalid frame should be released")
104                     .isEqualTo(expectedRefCnt)
105             assertThat(msgWithTooBigPayload.refCnt())
106                     .describedAs("message with payload exceeding 1MiB should be released")
107                     .isEqualTo(expectedRefCnt)
108         }
109
110         it("should release memory for end-of-transmission message") {
111             val (sut, sink) = vesHvWithStoringSink()
112             val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
113             val endOfTransmissionMessage = endOfTransmissionWireMessage()
114             val expectedRefCnt = 0
115
116             val handledEvents = sut.handleConnection(sink,
117                     validMessage,
118                     endOfTransmissionMessage
119             )
120
121             assertThat(handledEvents).hasSize(1)
122             assertThat(validMessage.refCnt())
123                     .describedAs("handled message should be released")
124                     .isEqualTo(expectedRefCnt)
125             assertThat(endOfTransmissionMessage.refCnt())
126                     .describedAs("end-of-transmission message should be released")
127                     .isEqualTo(expectedRefCnt)
128         }
129
130         it("should release memory for each message with invalid payload") {
131             val (sut, sink) = vesHvWithStoringSink()
132             val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
133             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
134             val expectedRefCnt = 0
135
136             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
137
138             assertThat(handledEvents).hasSize(1)
139
140             assertThat(validMessage.refCnt())
141                     .describedAs("handled message should be released")
142                     .isEqualTo(expectedRefCnt)
143             assertThat(msgWithInvalidPayload.refCnt())
144                     .describedAs("message with invalid payload should be released")
145                     .isEqualTo(expectedRefCnt)
146
147         }
148
149         it("should release memory for each message with garbage frame") {
150             val (sut, sink) = vesHvWithStoringSink()
151             val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
152             val msgWithGarbageFrame = garbageFrame()
153             val expectedRefCnt = 0
154
155             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
156
157             assertThat(handledEvents).hasSize(1)
158
159             assertThat(validMessage.refCnt())
160                     .describedAs("handled message should be released")
161                     .isEqualTo(expectedRefCnt)
162             assertThat(msgWithGarbageFrame.refCnt())
163                     .describedAs("message with garbage frame should be released")
164                     .isEqualTo(expectedRefCnt)
165
166         }
167     }
168
169     describe("message routing") {
170         it("should direct message to a topic by means of routing configuration") {
171             val (sut, sink) = vesHvWithStoringSink()
172
173             val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
174             assertThat(messages).describedAs("number of routed messages").hasSize(1)
175
176             val msg = messages[0]
177             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
178             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
179         }
180
181         it("should be able to direct 2 messages from different domains to one topic") {
182             val (sut, sink) = vesHvWithStoringSink()
183
184             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
185
186             val messages = sut.handleConnection(sink,
187                     vesWireFrameMessage(Domain.HVRANMEAS),
188                     vesWireFrameMessage(Domain.HEARTBEAT),
189                     vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
190
191             assertThat(messages).describedAs("number of routed messages").hasSize(3)
192
193             assertThat(messages[0].topic).describedAs("first message topic")
194                     .isEqualTo(HVRANMEAS_TOPIC)
195
196             assertThat(messages[1].topic).describedAs("second message topic")
197                     .isEqualTo(HVRANMEAS_TOPIC)
198
199             assertThat(messages[2].topic).describedAs("last message topic")
200                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
201         }
202
203         it("should drop message if route was not found") {
204             val (sut, sink) = vesHvWithStoringSink()
205             val messages = sut.handleConnection(sink,
206                     vesWireFrameMessage(Domain.OTHER, "first"),
207                     vesWireFrameMessage(Domain.HVRANMEAS, "second"),
208                     vesWireFrameMessage(Domain.HEARTBEAT, "third"))
209
210             assertThat(messages).describedAs("number of routed messages").hasSize(1)
211
212             val msg = messages[0]
213             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
214             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
215         }
216     }
217
218     describe("configuration update") {
219
220         val defaultTimeout = Duration.ofSeconds(10)
221
222         it("should update collector on configuration change") {
223             val (sut, _) = vesHvWithStoringSink()
224
225             sut.configurationProvider.updateConfiguration(basicConfiguration)
226             val firstCollector = sut.collector
227
228             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
229             val collectorAfterUpdate = sut.collector
230
231             assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
232
233         }
234
235         it("should start routing messages on configuration change") {
236             val (sut, sink) = vesHvWithStoringSink()
237
238             sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
239
240             val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
241             assertThat(messages).isEmpty()
242
243             sut.configurationProvider.updateConfiguration(basicConfiguration)
244
245             val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
246             assertThat(messagesAfterUpdate).hasSize(1)
247             val message = messagesAfterUpdate[0]
248
249             assertThat(message.topic).describedAs("routed message topic after configuration's change")
250                     .isEqualTo(HVRANMEAS_TOPIC)
251             assertThat(message.partition).describedAs("routed message partition")
252                     .isEqualTo(0)
253         }
254
255         it("should change domain routing on configuration change") {
256             val (sut, sink) = vesHvWithStoringSink()
257
258             sut.configurationProvider.updateConfiguration(basicConfiguration)
259
260             val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
261             assertThat(messages).hasSize(1)
262             val firstMessage = messages[0]
263
264             assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
265                     .isEqualTo(HVRANMEAS_TOPIC)
266             assertThat(firstMessage.partition).describedAs("routed message partition")
267                     .isEqualTo(0)
268
269
270             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
271
272             val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
273             assertThat(messagesAfterUpdate).hasSize(2)
274             val secondMessage = messagesAfterUpdate[1]
275
276             assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
277                     .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
278             assertThat(secondMessage.partition).describedAs("routed message partition")
279                     .isEqualTo(0)
280         }
281
282         it("should update routing for each client sending one message") {
283             val (sut, sink) = vesHvWithStoringSink()
284
285             sut.configurationProvider.updateConfiguration(basicConfiguration)
286
287             val messagesAmount = 10
288             val messagesForEachTopic = 5
289
290             Flux.range(0, messagesAmount).doOnNext {
291                 if (it == messagesForEachTopic) {
292                     sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
293                 }
294             }.doOnNext {
295                 sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
296             }.then().block(defaultTimeout)
297
298
299             val messages = sink.sentMessages
300             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
301             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
302
303             assertThat(messages.size).isEqualTo(messagesAmount)
304             assertThat(messagesForEachTopic)
305                     .describedAs("amount of messages routed to each topic")
306                     .isEqualTo(firstTopicMessagesCount)
307                     .isEqualTo(secondTopicMessagesCount)
308         }
309
310
311         it("should not update routing for client sending continuous stream of messages") {
312             val (sut, sink) = vesHvWithStoringSink()
313
314             sut.configurationProvider.updateConfiguration(basicConfiguration)
315
316             val messageStreamSize = 10
317             val pivot = 5
318
319             val incomingMessages = Flux.range(0, messageStreamSize)
320                     .doOnNext {
321                         if (it == pivot) {
322                             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
323                             println("config changed")
324                         }
325                     }
326                     .map { vesWireFrameMessage(Domain.HVRANMEAS) }
327
328
329             sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
330
331             val messages = sink.sentMessages
332             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
333             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
334
335             assertThat(messages.size).isEqualTo(messageStreamSize)
336             assertThat(firstTopicMessagesCount)
337                     .describedAs("amount of messages routed to first topic")
338                     .isEqualTo(messageStreamSize)
339
340             assertThat(secondTopicMessagesCount)
341                     .describedAs("amount of messages routed to second topic")
342                     .isEqualTo(0)
343         }
344     }
345
346     describe("request validation") {
347         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
348             val (sut, sink) = vesHvWithStoringSink()
349
350             val handledMessages = sut.handleConnection(sink,
351                     vesWireFrameMessage(Domain.HVRANMEAS, "first"),
352                     vesMessageWithTooBigPayload(Domain.HVRANMEAS),
353                     vesWireFrameMessage(Domain.HVRANMEAS))
354
355             assertThat(handledMessages).hasSize(1)
356             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
357         }
358     }
359
360 })
361
362 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
363     val sink = StoringSink()
364     val sut = Sut(sink)
365     sut.configurationProvider.updateConfiguration(basicConfiguration)
366     return Pair(sut, sink)
367 }