246fc7ed73b71b64147376b36519ffd0a013e93a
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.tests.fakes.*
27 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
28 import reactor.core.publisher.Flux
29 import java.time.Duration
30
31 /**
32  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
33  * @since May 2018
34  */
35 object VesHvSpecification : Spek({
36     debugRx(false)
37
38     describe("VES High Volume Collector") {
39         it("should handle multiple HV RAN events") {
40             val sink = StoringSink()
41             val sut = Sut(sink)
42             sut.configurationProvider.updateConfiguration(basicConfiguration)
43             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
44
45             assertThat(messages)
46                     .describedAs("should send all events")
47                     .hasSize(2)
48         }
49     }
50
51     describe("Memory management") {
52         it("should release memory for each handled and dropped message") {
53             val sink = StoringSink()
54             val sut = Sut(sink)
55             sut.configurationProvider.updateConfiguration(basicConfiguration)
56             val validMessage = vesMessage(Domain.HVRANMEAS)
57             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
58             val msgWithInvalidFrame = invalidWireFrame()
59             val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
60             val expectedRefCnt = 0
61
62             val handledEvents = sut.handleConnection(
63                     sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
64
65             assertThat(handledEvents).hasSize(1)
66
67             assertThat(validMessage.refCnt())
68                     .describedAs("handled message should be released")
69                     .isEqualTo(expectedRefCnt)
70             assertThat(msgWithInvalidDomain.refCnt())
71                     .describedAs("message with invalid domain should be released")
72                     .isEqualTo(expectedRefCnt)
73             assertThat(msgWithInvalidFrame.refCnt())
74                     .describedAs("message with invalid frame should be released")
75                     .isEqualTo(expectedRefCnt)
76             assertThat(msgWithTooBigPayload.refCnt())
77                     .describedAs("message with payload exceeding 1MiB should be released")
78                     .isEqualTo(expectedRefCnt)
79
80         }
81
82         it("should release memory for each message with invalid payload") {
83             val sink = StoringSink()
84             val sut = Sut(sink)
85             sut.configurationProvider.updateConfiguration(basicConfiguration)
86             val validMessage = vesMessage(Domain.HVRANMEAS)
87             val msgWithInvalidPayload = invalidVesMessage()
88             val expectedRefCnt = 0
89
90             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
91
92             assertThat(handledEvents).hasSize(1)
93
94             assertThat(validMessage.refCnt())
95                     .describedAs("handled message should be released")
96                     .isEqualTo(expectedRefCnt)
97             assertThat(msgWithInvalidPayload.refCnt())
98                     .describedAs("message with invalid payload should be released")
99                     .isEqualTo(expectedRefCnt)
100
101         }
102
103         it("should release memory for each message with garbage frame") {
104             val sink = StoringSink()
105             val sut = Sut(sink)
106             sut.configurationProvider.updateConfiguration(basicConfiguration)
107             val validMessage = vesMessage(Domain.HVRANMEAS)
108             val msgWithGarbageFrame = garbageFrame()
109             val expectedRefCnt = 0
110
111             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
112
113             assertThat(handledEvents).hasSize(1)
114
115             assertThat(validMessage.refCnt())
116                     .describedAs("handled message should be released")
117                     .isEqualTo(expectedRefCnt)
118             assertThat(msgWithGarbageFrame.refCnt())
119                     .describedAs("message with garbage frame should be released")
120                     .isEqualTo(expectedRefCnt)
121
122         }
123     }
124
125     describe("message routing") {
126         it("should direct message to a topic by means of routing configuration") {
127             val sink = StoringSink()
128             val sut = Sut(sink)
129             sut.configurationProvider.updateConfiguration(basicConfiguration)
130
131             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
132             assertThat(messages).describedAs("number of routed messages").hasSize(1)
133
134             val msg = messages[0]
135             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
136             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
137         }
138
139         it("should be able to direct 2 messages from different domains to one topic") {
140             val sink = StoringSink()
141             val sut = Sut(sink)
142
143             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
144
145             val messages = sut.handleConnection(sink,
146                     vesMessage(Domain.HVRANMEAS),
147                     vesMessage(Domain.HEARTBEAT),
148                     vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
149
150             assertThat(messages).describedAs("number of routed messages").hasSize(3)
151
152             assertThat(messages.get(0).topic).describedAs("first message topic")
153                     .isEqualTo(HVRANMEAS_TOPIC)
154
155             assertThat(messages.get(1).topic).describedAs("second message topic")
156                     .isEqualTo(HVRANMEAS_TOPIC)
157
158             assertThat(messages.get(2).topic).describedAs("last message topic")
159                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
160         }
161
162         it("should drop message if route was not found") {
163             val sink = StoringSink()
164             val sut = Sut(sink)
165             sut.configurationProvider.updateConfiguration(basicConfiguration)
166             val messages = sut.handleConnection(sink,
167                     vesMessage(Domain.OTHER, "first"),
168                     vesMessage(Domain.HVRANMEAS, "second"),
169                     vesMessage(Domain.HEARTBEAT, "third"))
170
171             assertThat(messages).describedAs("number of routed messages").hasSize(1)
172
173             val msg = messages[0]
174             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
175             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
176         }
177     }
178
179     describe("configuration update") {
180
181         val defaultTimeout = Duration.ofSeconds(10)
182
183         it("should update collector on configuration change") {
184             val sink = StoringSink()
185             val sut = Sut(sink)
186
187             sut.configurationProvider.updateConfiguration(basicConfiguration)
188             val firstCollector = sut.collector
189
190             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
191             val collectorAfterUpdate = sut.collector
192
193             assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
194
195         }
196
197         it("should start routing messages on configuration change") {
198             val sink = StoringSink()
199             val sut = Sut(sink)
200
201             sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
202
203             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
204             assertThat(messages).isEmpty()
205
206             sut.configurationProvider.updateConfiguration(basicConfiguration)
207
208             val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
209             assertThat(messagesAfterUpdate).hasSize(1)
210             val message = messagesAfterUpdate[0]
211
212             assertThat(message.topic).describedAs("routed message topic after configuration's change")
213                     .isEqualTo(HVRANMEAS_TOPIC)
214             assertThat(message.partition).describedAs("routed message partition")
215                     .isEqualTo(0)
216         }
217
218         it("should change domain routing on configuration change") {
219             val sink = StoringSink()
220             val sut = Sut(sink)
221
222             sut.configurationProvider.updateConfiguration(basicConfiguration)
223
224             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
225             assertThat(messages).hasSize(1)
226             val firstMessage = messages[0]
227
228             assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
229                     .isEqualTo(HVRANMEAS_TOPIC)
230             assertThat(firstMessage.partition).describedAs("routed message partition")
231                     .isEqualTo(0)
232
233
234             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
235
236             val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
237             assertThat(messagesAfterUpdate).hasSize(2)
238             val secondMessage = messagesAfterUpdate[1]
239
240             assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
241                     .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
242             assertThat(secondMessage.partition).describedAs("routed message partition")
243                     .isEqualTo(0)
244         }
245
246         it("should update routing for each client sending one message") {
247             val sink = StoringSink()
248             val sut = Sut(sink)
249
250             sut.configurationProvider.updateConfiguration(basicConfiguration)
251
252             val messagesAmount = 10
253             val messagesForEachTopic = 5
254
255             Flux.range(0, messagesAmount).doOnNext {
256                 if (it == messagesForEachTopic) {
257                     sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
258                 }
259             }.doOnNext {
260                 sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
261             }.then().block(defaultTimeout)
262
263
264             val messages = sink.sentMessages
265             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
266             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
267
268             assertThat(messages.size).isEqualTo(messagesAmount)
269             assertThat(messagesForEachTopic)
270                     .describedAs("amount of messages routed to each topic")
271                     .isEqualTo(firstTopicMessagesCount)
272                     .isEqualTo(secondTopicMessagesCount)
273         }
274
275
276         it("should not update routing for client sending continuous stream of messages") {
277             val sink = StoringSink()
278             val sut = Sut(sink)
279
280             sut.configurationProvider.updateConfiguration(basicConfiguration)
281
282             val messageStreamSize = 10
283             val pivot = 5
284
285             val incomingMessages = Flux.range(0, messageStreamSize)
286                     .doOnNext {
287                         if (it == pivot) {
288                             sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
289                             println("config changed")
290                         }
291                     }
292                     .map { vesMessage(Domain.HVRANMEAS) }
293
294
295             sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
296
297             val messages = sink.sentMessages
298             val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
299             val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
300
301             assertThat(messages.size).isEqualTo(messageStreamSize)
302             assertThat(firstTopicMessagesCount)
303                     .describedAs("amount of messages routed to first topic")
304                     .isEqualTo(messageStreamSize)
305
306             assertThat(secondTopicMessagesCount)
307                     .describedAs("amount of messages routed to second topic")
308                     .isEqualTo(0)
309         }
310     }
311
312     describe("request validation") {
313         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
314             val sink = StoringSink()
315             val sut = Sut(sink)
316             sut.configurationProvider.updateConfiguration(basicConfiguration)
317
318             val handledMessages = sut.handleConnection(sink,
319                     vesMessage(Domain.HVRANMEAS, "first"),
320                     vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
321                     vesMessage(Domain.HVRANMEAS, "third"))
322
323             assertThat(handledMessages).hasSize(1)
324             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
325         }
326     }
327
328 })