5e6e666f2d157932ddec7f68766bae00bada090f
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
28 import reactor.core.publisher.Flux
29 import java.time.Duration
30
31 /**
32  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
33  * @since May 2018
34  */
35 object VesHvSpecification : Spek({
36     debugRx(false)
37
38     describe("VES High Volume Collector") {
39         it("should handle multiple HV RAN events") {
40             val (sut, sink) = vesHvWithStoringSink()
41             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
42
43             assertThat(messages)
44                     .describedAs("should send all events")
45                     .hasSize(2)
46         }
47
48         it("should not handle messages received from client after end-of-transmission message") {
49             val (sut, sink) = vesHvWithStoringSink()
50             val validMessage = vesMessage(Domain.HVRANMEAS)
51             val anotherValidMessage = vesMessage(Domain.HVRANMEAS)
52             val endOfTransmissionMessage = endOfTransmissionMessage()
53
54             val handledEvents = sut.handleConnection(sink,
55                     validMessage,
56                     endOfTransmissionMessage,
57                     anotherValidMessage
58             )
59
60             assertThat(handledEvents).hasSize(1)
61             assertThat(validMessage.refCnt())
62                     .describedAs("first message should be released")
63                     .isEqualTo(0)
64             assertThat(endOfTransmissionMessage.refCnt())
65                     .describedAs("end-of-transmission message should be released")
66                     .isEqualTo(0)
67             assertThat(anotherValidMessage.refCnt())
68                     .describedAs("second (not handled) message should not be released")
69                     .isEqualTo(1)
70         }
71     }
72
73     describe("Memory management") {
74         it("should release memory for each handled and dropped message") {
75             val (sut, sink) = vesHvWithStoringSink()
76             val validMessage = vesMessage(Domain.HVRANMEAS)
77             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
78             val msgWithInvalidFrame = invalidWireFrame()
79             val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
80             val expectedRefCnt = 0
81
82             val handledEvents = sut.handleConnection(
83                     sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
84
85             assertThat(handledEvents).hasSize(1)
86
87             assertThat(validMessage.refCnt())
88                     .describedAs("handled message should be released")
89                     .isEqualTo(expectedRefCnt)
90             assertThat(msgWithInvalidDomain.refCnt())
91                     .describedAs("message with invalid domain should be released")
92                     .isEqualTo(expectedRefCnt)
93             assertThat(msgWithInvalidFrame.refCnt())
94                     .describedAs("message with invalid frame should be released")
95                     .isEqualTo(expectedRefCnt)
96             assertThat(msgWithTooBigPayload.refCnt())
97                     .describedAs("message with payload exceeding 1MiB should be released")
98                     .isEqualTo(expectedRefCnt)
99         }
100
101         it("should release memory for end-of-transmission message") {
102             val (sut, sink) = vesHvWithStoringSink()
103             val validMessage = vesMessage(Domain.HVRANMEAS)
104             val endOfTransmissionMessage = endOfTransmissionMessage()
105             val expectedRefCnt = 0
106
107             val handledEvents = sut.handleConnection(sink,
108                     validMessage,
109                     endOfTransmissionMessage
110             )
111
112             assertThat(handledEvents).hasSize(1)
113             assertThat(validMessage.refCnt())
114                     .describedAs("handled message should be released")
115                     .isEqualTo(expectedRefCnt)
116             assertThat(endOfTransmissionMessage.refCnt())
117                     .describedAs("end-of-transmission message should be released")
118                     .isEqualTo(expectedRefCnt)
119         }
120
121         it("should release memory for each message with invalid payload") {
122             val (sut, sink) = vesHvWithStoringSink()
123             val validMessage = vesMessage(Domain.HVRANMEAS)
124             val msgWithInvalidPayload = invalidVesMessage()
125             val expectedRefCnt = 0
126
127             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
128
129             assertThat(handledEvents).hasSize(1)
130
131             assertThat(validMessage.refCnt())
132                     .describedAs("handled message should be released")
133                     .isEqualTo(expectedRefCnt)
134             assertThat(msgWithInvalidPayload.refCnt())
135                     .describedAs("message with invalid payload should be released")
136                     .isEqualTo(expectedRefCnt)
137
138         }
139
140         it("should release memory for each message with garbage frame") {
141             val (sut, sink) = vesHvWithStoringSink()
142             val validMessage = vesMessage(Domain.HVRANMEAS)
143             val msgWithGarbageFrame = garbageFrame()
144             val expectedRefCnt = 0
145
146             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
147
148             assertThat(handledEvents).hasSize(1)
149
150             assertThat(validMessage.refCnt())
151                     .describedAs("handled message should be released")
152                     .isEqualTo(expectedRefCnt)
153             assertThat(msgWithGarbageFrame.refCnt())
154                     .describedAs("message with garbage frame should be released")
155                     .isEqualTo(expectedRefCnt)
156
157         }
158     }
159
160     describe("message routing") {
161         it("should direct message to a topic by means of routing configuration") {
162             val (sut, sink) = vesHvWithStoringSink()
163
164             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
165             assertThat(messages).describedAs("number of routed messages").hasSize(1)
166
167             val msg = messages[0]
168             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
169             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
170         }
171
172         it("should be able to direct 2 messages from different domains to one topic") {
173             val (sut, sink) = vesHvWithStoringSink()
174
175             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
176
177             val messages = sut.handleConnection(sink,
178                     vesMessage(Domain.HVRANMEAS),
179                     vesMessage(Domain.HEARTBEAT),
180                     vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
181
182             assertThat(messages).describedAs("number of routed messages").hasSize(3)
183
184             assertThat(messages[0].topic).describedAs("first message topic")
185                     .isEqualTo(HVRANMEAS_TOPIC)
186
187             assertThat(messages[1].topic).describedAs("second message topic")
188                     .isEqualTo(HVRANMEAS_TOPIC)
189
190             assertThat(messages[2].topic).describedAs("last message topic")
191                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
192         }
193
194         it("should drop message if route was not found") {
195             val (sut, sink) = vesHvWithStoringSink()
196             val messages = sut.handleConnection(sink,
197                     vesMessage(Domain.OTHER, "first"),
198                     vesMessage(Domain.HVRANMEAS, "second"),
199                     vesMessage(Domain.HEARTBEAT, "third"))
200
201             assertThat(messages).describedAs("number of routed messages").hasSize(1)
202
203             val msg = messages[0]
204             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
205             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
206         }
207     }
208
209     describe("configuration update") {
210
211         val defaultTimeout = Duration.ofSeconds(10)
212
213         it("should update collector on configuration change") {
214             val (sut, _) = vesHvWithStoringSink()
215
216             sut.configurationProvider.updateConfiguration(basicConfiguration)
217             val firstCollector = sut.collector
218
219             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
220             val collectorAfterUpdate = sut.collector
221
222             assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
223
224         }
225
226         it("should start routing messages on configuration change") {
227             val (sut, sink) = vesHvWithStoringSink()
228
229             sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
230
231             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
232             assertThat(messages).isEmpty()
233
234             sut.configurationProvider.updateConfiguration(basicConfiguration)
235
236             val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
237             assertThat(messagesAfterUpdate).hasSize(1)
238             val message = messagesAfterUpdate[0]
239
240             assertThat(message.topic).describedAs("routed message topic after configuration's change")
241                     .isEqualTo(HVRANMEAS_TOPIC)
242             assertThat(message.partition).describedAs("routed message partition")
243                     .isEqualTo(0)
244         }
245
246         it("should change domain routing on configuration change") {
247             val (sut, sink) = vesHvWithStoringSink()
248
249             sut.configurationProvider.updateConfiguration(basicConfiguration)
250
251             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
252             assertThat(messages).hasSize(1)
253             val firstMessage = messages[0]
254
255             assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
256                     .isEqualTo(HVRANMEAS_TOPIC)
257             assertThat(firstMessage.partition).describedAs("routed message partition")
258                     .isEqualTo(0)
259
260
261             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
262
263             val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
264             assertThat(messagesAfterUpdate).hasSize(2)
265             val secondMessage = messagesAfterUpdate[1]
266
267             assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
268                     .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
269             assertThat(secondMessage.partition).describedAs("routed message partition")
270                     .isEqualTo(0)
271         }
272
273         it("should update routing for each client sending one message") {
274             val (sut, sink) = vesHvWithStoringSink()
275
276             sut.configurationProvider.updateConfiguration(basicConfiguration)
277
278             val messagesAmount = 10
279             val messagesForEachTopic = 5
280
281             Flux.range(0, messagesAmount).doOnNext {
282                 if (it == messagesForEachTopic) {
283                     sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
284                 }
285             }.doOnNext {
286                 sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
287             }.then().block(defaultTimeout)
288
289
290             val messages = sink.sentMessages
291             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
292             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
293
294             assertThat(messages.size).isEqualTo(messagesAmount)
295             assertThat(messagesForEachTopic)
296                     .describedAs("amount of messages routed to each topic")
297                     .isEqualTo(firstTopicMessagesCount)
298                     .isEqualTo(secondTopicMessagesCount)
299         }
300
301
302         it("should not update routing for client sending continuous stream of messages") {
303             val (sut, sink) = vesHvWithStoringSink()
304
305             sut.configurationProvider.updateConfiguration(basicConfiguration)
306
307             val messageStreamSize = 10
308             val pivot = 5
309
310             val incomingMessages = Flux.range(0, messageStreamSize)
311                     .doOnNext {
312                         if (it == pivot) {
313                             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
314                             println("config changed")
315                         }
316                     }
317                     .map { vesMessage(Domain.HVRANMEAS) }
318
319
320             sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
321
322             val messages = sink.sentMessages
323             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
324             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
325
326             assertThat(messages.size).isEqualTo(messageStreamSize)
327             assertThat(firstTopicMessagesCount)
328                     .describedAs("amount of messages routed to first topic")
329                     .isEqualTo(messageStreamSize)
330
331             assertThat(secondTopicMessagesCount)
332                     .describedAs("amount of messages routed to second topic")
333                     .isEqualTo(0)
334         }
335     }
336
337     describe("request validation") {
338         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
339             val (sut, sink) = vesHvWithStoringSink()
340
341             val handledMessages = sut.handleConnection(sink,
342                     vesMessage(Domain.HVRANMEAS, "first"),
343                     vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
344                     vesMessage(Domain.HVRANMEAS, "third"))
345
346             assertThat(handledMessages).hasSize(1)
347             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
348         }
349     }
350
351 })
352
353 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
354     val sink = StoringSink()
355     val sut = Sut(sink)
356     sut.configurationProvider.updateConfiguration(basicConfiguration)
357     return Pair(sut, sink)
358 }