Add metrics for dropped messages
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / VesHvSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.*
41
42 import reactor.core.publisher.Flux
43 import java.time.Duration
44
45 /**
46  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
47  * @since May 2018
48  */
49 object VesHvSpecification : Spek({
50     debugRx(false)
51
52     describe("VES High Volume Collector") {
53         it("should handle multiple HV RAN events") {
54             val (sut, sink) = vesHvWithStoringSink()
55             val messages = sut.handleConnection(sink,
56                     vesWireFrameMessage(PERF3GPP),
57                     vesWireFrameMessage(PERF3GPP)
58             )
59
60             assertThat(messages)
61                     .describedAs("should send all events")
62                     .hasSize(2)
63         }
64     }
65
66     describe("Memory management") {
67         it("should release memory for each handled and dropped message") {
68             val (sut, sink) = vesHvWithStoringSink()
69             val validMessage = vesWireFrameMessage(PERF3GPP)
70             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
71             val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
72             val expectedRefCnt = 0
73
74             val handledEvents = sut.handleConnection(
75                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
76
77             assertThat(handledEvents).hasSize(1)
78
79             assertThat(validMessage.refCnt())
80                     .describedAs("handled message should be released")
81                     .isEqualTo(expectedRefCnt)
82             assertThat(msgWithInvalidFrame.refCnt())
83                     .describedAs("message with invalid frame should be released")
84                     .isEqualTo(expectedRefCnt)
85             assertThat(msgWithTooBigPayload.refCnt())
86                     .describedAs("message with payload exceeding 1MiB should be released")
87                     .isEqualTo(expectedRefCnt)
88         }
89
90         it("should release memory for each message with invalid payload") {
91             val (sut, sink) = vesHvWithStoringSink()
92             val validMessage = vesWireFrameMessage(PERF3GPP)
93             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
94             val expectedRefCnt = 0
95
96             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
97
98             assertThat(handledEvents).hasSize(1)
99
100             assertThat(validMessage.refCnt())
101                     .describedAs("handled message should be released")
102                     .isEqualTo(expectedRefCnt)
103             assertThat(msgWithInvalidPayload.refCnt())
104                     .describedAs("message with invalid payload should be released")
105                     .isEqualTo(expectedRefCnt)
106
107         }
108
109         it("should release memory for each message with garbage frame") {
110             val (sut, sink) = vesHvWithStoringSink()
111             val validMessage = vesWireFrameMessage(PERF3GPP)
112             val msgWithGarbageFrame = garbageFrame()
113             val expectedRefCnt = 0
114
115             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
116
117             assertThat(handledEvents).hasSize(1)
118
119             assertThat(validMessage.refCnt())
120                     .describedAs("handled message should be released")
121                     .isEqualTo(expectedRefCnt)
122             assertThat(msgWithGarbageFrame.refCnt())
123                     .describedAs("message with garbage frame should be released")
124                     .isEqualTo(expectedRefCnt)
125
126         }
127     }
128
129     describe("message routing") {
130         it("should direct message to a topic by means of routing configuration") {
131             val (sut, sink) = vesHvWithStoringSink()
132
133             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
134             assertThat(messages).describedAs("number of routed messages").hasSize(1)
135
136             val msg = messages[0]
137             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
138             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
139         }
140
141         it("should be able to direct 2 messages from different domains to one topic") {
142             val (sut, sink) = vesHvWithStoringSink()
143
144             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
145
146             val messages = sut.handleConnection(sink,
147                     vesWireFrameMessage(PERF3GPP),
148                     vesWireFrameMessage(HEARTBEAT),
149                     vesWireFrameMessage(MEASUREMENT))
150
151             assertThat(messages).describedAs("number of routed messages").hasSize(3)
152
153             assertThat(messages[0].topic).describedAs("first message topic")
154                     .isEqualTo(PERF3GPP_TOPIC)
155
156             assertThat(messages[1].topic).describedAs("second message topic")
157                     .isEqualTo(PERF3GPP_TOPIC)
158
159             assertThat(messages[2].topic).describedAs("last message topic")
160                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
161         }
162
163         it("should drop message if route was not found") {
164             val (sut, sink) = vesHvWithStoringSink()
165             val messages = sut.handleConnection(sink,
166                     vesWireFrameMessage(OTHER, "first"),
167                     vesWireFrameMessage(PERF3GPP, "second"),
168                     vesWireFrameMessage(HEARTBEAT, "third"))
169
170             assertThat(messages).describedAs("number of routed messages").hasSize(1)
171
172             val msg = messages[0]
173             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
174             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
175         }
176     }
177
178     describe("configuration update") {
179
180         val defaultTimeout = Duration.ofSeconds(10)
181
182         given("successful configuration change") {
183
184             lateinit var sut: Sut
185             lateinit var sink: StoringSink
186
187             beforeEachTest {
188                 vesHvWithStoringSink().run {
189                     sut = first
190                     sink = second
191                 }
192             }
193
194             it("should update collector") {
195                 val firstCollector = sut.collector
196
197                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
198                 val collectorAfterUpdate = sut.collector
199
200                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
201             }
202
203             it("should start routing messages") {
204
205                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
206
207                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
208                 assertThat(messages).isEmpty()
209
210                 sut.configurationProvider.updateConfiguration(basicConfiguration)
211
212                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
213                 assertThat(messagesAfterUpdate).hasSize(1)
214                 val message = messagesAfterUpdate[0]
215
216                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
217                         .isEqualTo(PERF3GPP_TOPIC)
218                 assertThat(message.partition).describedAs("routed message partition")
219                         .isEqualTo(0)
220             }
221
222             it("should change domain routing") {
223
224                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
225                 assertThat(messages).hasSize(1)
226                 val firstMessage = messages[0]
227
228                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
229                         .isEqualTo(PERF3GPP_TOPIC)
230                 assertThat(firstMessage.partition).describedAs("routed message partition")
231                         .isEqualTo(0)
232
233
234                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
235
236                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
237                 assertThat(messagesAfterUpdate).hasSize(2)
238                 val secondMessage = messagesAfterUpdate[1]
239
240                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
241                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
242                 assertThat(secondMessage.partition).describedAs("routed message partition")
243                         .isEqualTo(0)
244             }
245
246             it("should update routing for each client sending one message") {
247
248                 val messagesAmount = 10
249                 val messagesForEachTopic = 5
250
251                 Flux.range(0, messagesAmount).doOnNext {
252                     if (it == messagesForEachTopic) {
253                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
254                     }
255                 }.doOnNext {
256                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
257                 }.then().block(defaultTimeout)
258
259
260                 val messages = sink.sentMessages
261                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
262                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
263
264                 assertThat(messages.size).isEqualTo(messagesAmount)
265                 assertThat(messagesForEachTopic)
266                         .describedAs("amount of messages routed to each topic")
267                         .isEqualTo(firstTopicMessagesCount)
268                         .isEqualTo(secondTopicMessagesCount)
269             }
270
271             it("should not update routing for client sending continuous stream of messages") {
272
273                 val messageStreamSize = 10
274                 val pivot = 5
275
276                 val incomingMessages = Flux.range(0, messageStreamSize)
277                         .doOnNext {
278                             if (it == pivot) {
279                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
280                                 println("config changed")
281                             }
282                         }
283                         .map { vesWireFrameMessage(PERF3GPP) }
284
285
286                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
287
288                 val messages = sink.sentMessages
289                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
290                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
291
292                 assertThat(messages.size).isEqualTo(messageStreamSize)
293                 assertThat(firstTopicMessagesCount)
294                         .describedAs("amount of messages routed to first topic")
295                         .isEqualTo(messageStreamSize)
296
297                 assertThat(secondTopicMessagesCount)
298                         .describedAs("amount of messages routed to second topic")
299                         .isEqualTo(0)
300             }
301
302             it("should mark the application healthy") {
303                 assertThat(sut.healthStateProvider.currentHealth)
304                         .describedAs("application health state")
305                         .isEqualTo(HealthDescription.HEALTHY)
306             }
307         }
308
309         given("failed configuration change") {
310             val (sut, _) = vesHvWithStoringSink()
311             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
312             sut.configurationProvider.updateConfiguration(basicConfiguration)
313
314             it("should mark the application unhealthy ") {
315                 assertThat(sut.healthStateProvider.currentHealth)
316                         .describedAs("application health state")
317                         .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
318             }
319         }
320     }
321
322     describe("request validation") {
323         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
324             val (sut, sink) = vesHvWithStoringSink()
325
326             val handledMessages = sut.handleConnection(sink,
327                     vesWireFrameMessage(PERF3GPP, "first"),
328                     messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
329                     vesWireFrameMessage(PERF3GPP))
330
331             assertThat(handledMessages).hasSize(1)
332             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
333         }
334     }
335
336 })
337
338 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
339     val sink = StoringSink()
340     val sut = Sut(sink)
341     sut.configurationProvider.updateConfiguration(basicConfiguration)
342     return Pair(sut, sink)
343 }