Align with latest HV-VES proto definition
[dcaegen2/collectors/hv-ves.git] / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / VesHvSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
41 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
43 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
44 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
45 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
46
47 import reactor.core.publisher.Flux
48 import java.time.Duration
49
50 /**
51  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52  * @since May 2018
53  */
54 object VesHvSpecification : Spek({
55     debugRx(false)
56
57     describe("VES High Volume Collector") {
58         it("should handle multiple HV RAN events") {
59             val (sut, sink) = vesHvWithStoringSink()
60             val messages = sut.handleConnection(sink,
61                     vesWireFrameMessage(HVMEAS),
62                     vesWireFrameMessage(HVMEAS)
63             )
64
65             assertThat(messages)
66                     .describedAs("should send all events")
67                     .hasSize(2)
68         }
69
70         it("should not handle messages received from client after end-of-transmission message") {
71             val (sut, sink) = vesHvWithStoringSink()
72             val validMessage = vesWireFrameMessage(HVMEAS)
73             val anotherValidMessage = vesWireFrameMessage(HVMEAS)
74             val endOfTransmissionMessage = endOfTransmissionWireMessage()
75
76             val handledEvents = sut.handleConnection(sink,
77                     validMessage,
78                     endOfTransmissionMessage,
79                     anotherValidMessage
80             )
81
82             assertThat(handledEvents).hasSize(1)
83             assertThat(validMessage.refCnt())
84                     .describedAs("first message should be released")
85                     .isEqualTo(0)
86             assertThat(endOfTransmissionMessage.refCnt())
87                     .describedAs("end-of-transmission message should be released")
88                     .isEqualTo(0)
89             assertThat(anotherValidMessage.refCnt())
90                     .describedAs("second (not handled) message should not be released")
91                     .isEqualTo(1)
92         }
93     }
94
95     describe("Memory management") {
96         it("should release memory for each handled and dropped message") {
97             val (sut, sink) = vesHvWithStoringSink()
98             val validMessage = vesWireFrameMessage(HVMEAS)
99             val msgWithInvalidFrame = invalidWireFrame()
100             val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
101             val expectedRefCnt = 0
102
103             val handledEvents = sut.handleConnection(
104                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
105
106             assertThat(handledEvents).hasSize(1)
107
108             assertThat(validMessage.refCnt())
109                     .describedAs("handled message should be released")
110                     .isEqualTo(expectedRefCnt)
111             assertThat(msgWithInvalidFrame.refCnt())
112                     .describedAs("message with invalid frame should be released")
113                     .isEqualTo(expectedRefCnt)
114             assertThat(msgWithTooBigPayload.refCnt())
115                     .describedAs("message with payload exceeding 1MiB should be released")
116                     .isEqualTo(expectedRefCnt)
117         }
118
119         it("should release memory for end-of-transmission message") {
120             val (sut, sink) = vesHvWithStoringSink()
121             val validMessage = vesWireFrameMessage(HVMEAS)
122             val endOfTransmissionMessage = endOfTransmissionWireMessage()
123             val expectedRefCnt = 0
124
125             val handledEvents = sut.handleConnection(sink,
126                     validMessage,
127                     endOfTransmissionMessage
128             )
129
130             assertThat(handledEvents).hasSize(1)
131             assertThat(validMessage.refCnt())
132                     .describedAs("handled message should be released")
133                     .isEqualTo(expectedRefCnt)
134             assertThat(endOfTransmissionMessage.refCnt())
135                     .describedAs("end-of-transmission message should be released")
136                     .isEqualTo(expectedRefCnt)
137         }
138
139         it("should release memory for each message with invalid payload") {
140             val (sut, sink) = vesHvWithStoringSink()
141             val validMessage = vesWireFrameMessage(HVMEAS)
142             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
143             val expectedRefCnt = 0
144
145             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
146
147             assertThat(handledEvents).hasSize(1)
148
149             assertThat(validMessage.refCnt())
150                     .describedAs("handled message should be released")
151                     .isEqualTo(expectedRefCnt)
152             assertThat(msgWithInvalidPayload.refCnt())
153                     .describedAs("message with invalid payload should be released")
154                     .isEqualTo(expectedRefCnt)
155
156         }
157
158         it("should release memory for each message with garbage frame") {
159             val (sut, sink) = vesHvWithStoringSink()
160             val validMessage = vesWireFrameMessage(HVMEAS)
161             val msgWithGarbageFrame = garbageFrame()
162             val expectedRefCnt = 0
163
164             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
165
166             assertThat(handledEvents).hasSize(1)
167
168             assertThat(validMessage.refCnt())
169                     .describedAs("handled message should be released")
170                     .isEqualTo(expectedRefCnt)
171             assertThat(msgWithGarbageFrame.refCnt())
172                     .describedAs("message with garbage frame should be released")
173                     .isEqualTo(expectedRefCnt)
174
175         }
176     }
177
178     describe("message routing") {
179         it("should direct message to a topic by means of routing configuration") {
180             val (sut, sink) = vesHvWithStoringSink()
181
182             val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
183             assertThat(messages).describedAs("number of routed messages").hasSize(1)
184
185             val msg = messages[0]
186             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
187             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
188         }
189
190         it("should be able to direct 2 messages from different domains to one topic") {
191             val (sut, sink) = vesHvWithStoringSink()
192
193             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
194
195             val messages = sut.handleConnection(sink,
196                     vesWireFrameMessage(HVMEAS),
197                     vesWireFrameMessage(HEARTBEAT),
198                     vesWireFrameMessage(MEASUREMENT))
199
200             assertThat(messages).describedAs("number of routed messages").hasSize(3)
201
202             assertThat(messages[0].topic).describedAs("first message topic")
203                     .isEqualTo(HVMEAS_TOPIC)
204
205             assertThat(messages[1].topic).describedAs("second message topic")
206                     .isEqualTo(HVMEAS_TOPIC)
207
208             assertThat(messages[2].topic).describedAs("last message topic")
209                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
210         }
211
212         it("should drop message if route was not found") {
213             val (sut, sink) = vesHvWithStoringSink()
214             val messages = sut.handleConnection(sink,
215                     vesWireFrameMessage(OTHER, "first"),
216                     vesWireFrameMessage(HVMEAS, "second"),
217                     vesWireFrameMessage(HEARTBEAT, "third"))
218
219             assertThat(messages).describedAs("number of routed messages").hasSize(1)
220
221             val msg = messages[0]
222             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
223             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
224         }
225     }
226
227     describe("configuration update") {
228
229         val defaultTimeout = Duration.ofSeconds(10)
230
231         given("successful configuration change") {
232
233             lateinit var sut: Sut
234             lateinit var sink: StoringSink
235
236             beforeEachTest {
237                 vesHvWithStoringSink().run {
238                     sut = first
239                     sink = second
240                 }
241             }
242
243             it("should update collector") {
244                 val firstCollector = sut.collector
245
246                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
247                 val collectorAfterUpdate = sut.collector
248
249                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
250             }
251
252             it("should start routing messages") {
253
254                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
255
256                 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
257                 assertThat(messages).isEmpty()
258
259                 sut.configurationProvider.updateConfiguration(basicConfiguration)
260
261                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
262                 assertThat(messagesAfterUpdate).hasSize(1)
263                 val message = messagesAfterUpdate[0]
264
265                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
266                         .isEqualTo(HVMEAS_TOPIC)
267                 assertThat(message.partition).describedAs("routed message partition")
268                         .isEqualTo(0)
269             }
270
271             it("should change domain routing") {
272
273                 val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
274                 assertThat(messages).hasSize(1)
275                 val firstMessage = messages[0]
276
277                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
278                         .isEqualTo(HVMEAS_TOPIC)
279                 assertThat(firstMessage.partition).describedAs("routed message partition")
280                         .isEqualTo(0)
281
282
283                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
284
285                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
286                 assertThat(messagesAfterUpdate).hasSize(2)
287                 val secondMessage = messagesAfterUpdate[1]
288
289                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
290                         .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
291                 assertThat(secondMessage.partition).describedAs("routed message partition")
292                         .isEqualTo(0)
293             }
294
295             it("should update routing for each client sending one message") {
296
297                 val messagesAmount = 10
298                 val messagesForEachTopic = 5
299
300                 Flux.range(0, messagesAmount).doOnNext {
301                     if (it == messagesForEachTopic) {
302                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
303                     }
304                 }.doOnNext {
305                     sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
306                 }.then().block(defaultTimeout)
307
308
309                 val messages = sink.sentMessages
310                 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
311                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
312
313                 assertThat(messages.size).isEqualTo(messagesAmount)
314                 assertThat(messagesForEachTopic)
315                         .describedAs("amount of messages routed to each topic")
316                         .isEqualTo(firstTopicMessagesCount)
317                         .isEqualTo(secondTopicMessagesCount)
318             }
319
320             it("should not update routing for client sending continuous stream of messages") {
321
322                 val messageStreamSize = 10
323                 val pivot = 5
324
325                 val incomingMessages = Flux.range(0, messageStreamSize)
326                         .doOnNext {
327                             if (it == pivot) {
328                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
329                                 println("config changed")
330                             }
331                         }
332                         .map { vesWireFrameMessage(HVMEAS) }
333
334
335                 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
336
337                 val messages = sink.sentMessages
338                 val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
339                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
340
341                 assertThat(messages.size).isEqualTo(messageStreamSize)
342                 assertThat(firstTopicMessagesCount)
343                         .describedAs("amount of messages routed to first topic")
344                         .isEqualTo(messageStreamSize)
345
346                 assertThat(secondTopicMessagesCount)
347                         .describedAs("amount of messages routed to second topic")
348                         .isEqualTo(0)
349             }
350
351             it("should mark the application healthy") {
352                 assertThat(sut.healthStateProvider.currentHealth)
353                         .describedAs("application health state")
354                         .isEqualTo(HealthDescription.HEALTHY)
355             }
356         }
357
358         given("failed configuration change") {
359             val (sut, _) = vesHvWithStoringSink()
360             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
361             sut.configurationProvider.updateConfiguration(basicConfiguration)
362
363             it("should mark the application unhealthy ") {
364                 assertThat(sut.healthStateProvider.currentHealth)
365                         .describedAs("application health state")
366                         .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
367             }
368         }
369     }
370
371     describe("request validation") {
372         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
373             val (sut, sink) = vesHvWithStoringSink()
374
375             val handledMessages = sut.handleConnection(sink,
376                     vesWireFrameMessage(HVMEAS, "first"),
377                     vesMessageWithTooBigPayload(HVMEAS),
378                     vesWireFrameMessage(HVMEAS))
379
380             assertThat(handledMessages).hasSize(1)
381             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
382         }
383     }
384
385 })
386
387 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
388     val sink = StoringSink()
389     val sut = Sut(sink)
390     sut.configurationProvider.updateConfiguration(basicConfiguration)
391     return Pair(sut, sink)
392 }