Custom detekt rule for logger usage check
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-ct / src / test / kotlin / org / onap / dcae / collectors / veshv / tests / component / VesHvSpecification.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.given
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
28 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
29 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
30 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
31 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
32 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
33 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
34 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
35 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
36 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
37 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
38 import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
39 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
40 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
41 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
42 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
43 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
44 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
45
46 import reactor.core.publisher.Flux
47 import java.time.Duration
48
49 /**
50  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
51  * @since May 2018
52  */
53 object VesHvSpecification : Spek({
54     debugRx(false)
55
56     describe("VES High Volume Collector") {
57         it("should handle multiple HV RAN events") {
58             val (sut, sink) = vesHvWithStoringSink()
59             val messages = sut.handleConnection(sink,
60                     vesWireFrameMessage(PERF3GPP),
61                     vesWireFrameMessage(PERF3GPP)
62             )
63
64             assertThat(messages)
65                     .describedAs("should send all events")
66                     .hasSize(2)
67         }
68     }
69
70     describe("Memory management") {
71         it("should release memory for each handled and dropped message") {
72             val (sut, sink) = vesHvWithStoringSink()
73             val validMessage = vesWireFrameMessage(PERF3GPP)
74             val msgWithInvalidFrame = invalidWireFrame()
75             val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
76             val expectedRefCnt = 0
77
78             val handledEvents = sut.handleConnection(
79                     sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
80
81             assertThat(handledEvents).hasSize(1)
82
83             assertThat(validMessage.refCnt())
84                     .describedAs("handled message should be released")
85                     .isEqualTo(expectedRefCnt)
86             assertThat(msgWithInvalidFrame.refCnt())
87                     .describedAs("message with invalid frame should be released")
88                     .isEqualTo(expectedRefCnt)
89             assertThat(msgWithTooBigPayload.refCnt())
90                     .describedAs("message with payload exceeding 1MiB should be released")
91                     .isEqualTo(expectedRefCnt)
92         }
93
94         it("should release memory for each message with invalid payload") {
95             val (sut, sink) = vesHvWithStoringSink()
96             val validMessage = vesWireFrameMessage(PERF3GPP)
97             val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
98             val expectedRefCnt = 0
99
100             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
101
102             assertThat(handledEvents).hasSize(1)
103
104             assertThat(validMessage.refCnt())
105                     .describedAs("handled message should be released")
106                     .isEqualTo(expectedRefCnt)
107             assertThat(msgWithInvalidPayload.refCnt())
108                     .describedAs("message with invalid payload should be released")
109                     .isEqualTo(expectedRefCnt)
110
111         }
112
113         it("should release memory for each message with garbage frame") {
114             val (sut, sink) = vesHvWithStoringSink()
115             val validMessage = vesWireFrameMessage(PERF3GPP)
116             val msgWithGarbageFrame = garbageFrame()
117             val expectedRefCnt = 0
118
119             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
120
121             assertThat(handledEvents).hasSize(1)
122
123             assertThat(validMessage.refCnt())
124                     .describedAs("handled message should be released")
125                     .isEqualTo(expectedRefCnt)
126             assertThat(msgWithGarbageFrame.refCnt())
127                     .describedAs("message with garbage frame should be released")
128                     .isEqualTo(expectedRefCnt)
129
130         }
131     }
132
133     describe("message routing") {
134         it("should direct message to a topic by means of routing configuration") {
135             val (sut, sink) = vesHvWithStoringSink()
136
137             val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
138             assertThat(messages).describedAs("number of routed messages").hasSize(1)
139
140             val msg = messages[0]
141             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
142             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
143         }
144
145         it("should be able to direct 2 messages from different domains to one topic") {
146             val (sut, sink) = vesHvWithStoringSink()
147
148             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
149
150             val messages = sut.handleConnection(sink,
151                     vesWireFrameMessage(PERF3GPP),
152                     vesWireFrameMessage(HEARTBEAT),
153                     vesWireFrameMessage(MEASUREMENT))
154
155             assertThat(messages).describedAs("number of routed messages").hasSize(3)
156
157             assertThat(messages[0].topic).describedAs("first message topic")
158                     .isEqualTo(PERF3GPP_TOPIC)
159
160             assertThat(messages[1].topic).describedAs("second message topic")
161                     .isEqualTo(PERF3GPP_TOPIC)
162
163             assertThat(messages[2].topic).describedAs("last message topic")
164                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
165         }
166
167         it("should drop message if route was not found") {
168             val (sut, sink) = vesHvWithStoringSink()
169             val messages = sut.handleConnection(sink,
170                     vesWireFrameMessage(OTHER, "first"),
171                     vesWireFrameMessage(PERF3GPP, "second"),
172                     vesWireFrameMessage(HEARTBEAT, "third"))
173
174             assertThat(messages).describedAs("number of routed messages").hasSize(1)
175
176             val msg = messages[0]
177             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
178             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
179         }
180     }
181
182     describe("configuration update") {
183
184         val defaultTimeout = Duration.ofSeconds(10)
185
186         given("successful configuration change") {
187
188             lateinit var sut: Sut
189             lateinit var sink: StoringSink
190
191             beforeEachTest {
192                 vesHvWithStoringSink().run {
193                     sut = first
194                     sink = second
195                 }
196             }
197
198             it("should update collector") {
199                 val firstCollector = sut.collector
200
201                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
202                 val collectorAfterUpdate = sut.collector
203
204                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
205             }
206
207             it("should start routing messages") {
208
209                 sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
210
211                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
212                 assertThat(messages).isEmpty()
213
214                 sut.configurationProvider.updateConfiguration(basicConfiguration)
215
216                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
217                 assertThat(messagesAfterUpdate).hasSize(1)
218                 val message = messagesAfterUpdate[0]
219
220                 assertThat(message.topic).describedAs("routed message topic after configuration's change")
221                         .isEqualTo(PERF3GPP_TOPIC)
222                 assertThat(message.partition).describedAs("routed message partition")
223                         .isEqualTo(0)
224             }
225
226             it("should change domain routing") {
227
228                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
229                 assertThat(messages).hasSize(1)
230                 val firstMessage = messages[0]
231
232                 assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
233                         .isEqualTo(PERF3GPP_TOPIC)
234                 assertThat(firstMessage.partition).describedAs("routed message partition")
235                         .isEqualTo(0)
236
237
238                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
239
240                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
241                 assertThat(messagesAfterUpdate).hasSize(2)
242                 val secondMessage = messagesAfterUpdate[1]
243
244                 assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
245                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
246                 assertThat(secondMessage.partition).describedAs("routed message partition")
247                         .isEqualTo(0)
248             }
249
250             it("should update routing for each client sending one message") {
251
252                 val messagesAmount = 10
253                 val messagesForEachTopic = 5
254
255                 Flux.range(0, messagesAmount).doOnNext {
256                     if (it == messagesForEachTopic) {
257                         sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
258                     }
259                 }.doOnNext {
260                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
261                 }.then().block(defaultTimeout)
262
263
264                 val messages = sink.sentMessages
265                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
266                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
267
268                 assertThat(messages.size).isEqualTo(messagesAmount)
269                 assertThat(messagesForEachTopic)
270                         .describedAs("amount of messages routed to each topic")
271                         .isEqualTo(firstTopicMessagesCount)
272                         .isEqualTo(secondTopicMessagesCount)
273             }
274
275             it("should not update routing for client sending continuous stream of messages") {
276
277                 val messageStreamSize = 10
278                 val pivot = 5
279
280                 val incomingMessages = Flux.range(0, messageStreamSize)
281                         .doOnNext {
282                             if (it == pivot) {
283                                 sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
284                                 println("config changed")
285                             }
286                         }
287                         .map { vesWireFrameMessage(PERF3GPP) }
288
289
290                 sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
291
292                 val messages = sink.sentMessages
293                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
294                 val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
295
296                 assertThat(messages.size).isEqualTo(messageStreamSize)
297                 assertThat(firstTopicMessagesCount)
298                         .describedAs("amount of messages routed to first topic")
299                         .isEqualTo(messageStreamSize)
300
301                 assertThat(secondTopicMessagesCount)
302                         .describedAs("amount of messages routed to second topic")
303                         .isEqualTo(0)
304             }
305
306             it("should mark the application healthy") {
307                 assertThat(sut.healthStateProvider.currentHealth)
308                         .describedAs("application health state")
309                         .isEqualTo(HealthDescription.HEALTHY)
310             }
311         }
312
313         given("failed configuration change") {
314             val (sut, _) = vesHvWithStoringSink()
315             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
316             sut.configurationProvider.updateConfiguration(basicConfiguration)
317
318             it("should mark the application unhealthy ") {
319                 assertThat(sut.healthStateProvider.currentHealth)
320                         .describedAs("application health state")
321                         .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
322             }
323         }
324     }
325
326     describe("request validation") {
327         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
328             val (sut, sink) = vesHvWithStoringSink()
329
330             val handledMessages = sut.handleConnection(sink,
331                     vesWireFrameMessage(PERF3GPP, "first"),
332                     vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
333                     vesWireFrameMessage(PERF3GPP))
334
335             assertThat(handledMessages).hasSize(1)
336             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
337         }
338     }
339
340 })
341
342 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
343     val sink = StoringSink()
344     val sut = Sut(sink)
345     sut.configurationProvider.updateConfiguration(basicConfiguration)
346     return Pair(sut, sink)
347 }