aa5810d388e124ecc7b5027ad3a14ca3e109ea0f
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import org.assertj.core.api.Assertions.assertThat
23 import org.jetbrains.spek.api.Spek
24 import org.jetbrains.spek.api.dsl.describe
25 import org.jetbrains.spek.api.dsl.it
26 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
27 import org.onap.dcae.collectors.veshv.model.Routing
28 import org.onap.dcae.collectors.veshv.model.routing
29 import org.onap.dcae.collectors.veshv.tests.fakes.*
30 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
31
32 /**
33  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
34  * @since May 2018
35  */
36 object VesHvSpecification : Spek({
37     debugRx(false)
38
39     describe("VES High Volume Collector") {
40         it("should handle multiple HV RAN events") {
41             val sink = StoringSink()
42             val sut = Sut(sink)
43             sut.configurationProvider.updateConfiguration(basicConfiguration)
44             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
45
46             assertThat(messages)
47                     .describedAs("should send all events")
48                     .hasSize(2)
49         }
50     }
51
52     describe("Memory management") {
53
54         it("should release memory for each handled and dropped message") {
55             val sink = StoringSink()
56             val sut = Sut(sink)
57             sut.configurationProvider.updateConfiguration(basicConfiguration)
58             val validMessage = vesMessage(Domain.HVRANMEAS)
59             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
60             val msgWithInvalidFrame = invalidWireFrame()
61             val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
62             val expectedRefCnt = 0
63
64             val handledEvents = sut.handleConnection(
65                     sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
66
67             assertThat(handledEvents).hasSize(1)
68
69             assertThat(validMessage.refCnt())
70                     .describedAs("handled message should be released")
71                     .isEqualTo(expectedRefCnt)
72             assertThat(msgWithInvalidDomain.refCnt())
73                     .describedAs("message with invalid domain should be released")
74                     .isEqualTo(expectedRefCnt)
75             assertThat(msgWithInvalidFrame.refCnt())
76                     .describedAs("message with invalid frame should be released")
77                     .isEqualTo(expectedRefCnt)
78             assertThat(msgWithTooBigPayload.refCnt())
79                     .describedAs("message with payload exceeding 1MiB should be released")
80                     .isEqualTo(expectedRefCnt)
81
82         }
83
84         it("should release memory for each message with invalid payload") {
85             val sink = StoringSink()
86             val sut = Sut(sink)
87             sut.configurationProvider.updateConfiguration(basicConfiguration)
88             val validMessage = vesMessage(Domain.HVRANMEAS)
89             val msgWithInvalidPayload = invalidVesMessage()
90             val expectedRefCnt = 0
91
92             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
93
94             assertThat(handledEvents).hasSize(1)
95
96             assertThat(validMessage.refCnt())
97                     .describedAs("handled message should be released")
98                     .isEqualTo(expectedRefCnt)
99             assertThat(msgWithInvalidPayload.refCnt())
100                     .describedAs("message with invalid payload should be released")
101                     .isEqualTo(expectedRefCnt)
102
103         }
104
105         it("should release memory for each message with garbage frame") {
106             val sink = StoringSink()
107             val sut = Sut(sink)
108             sut.configurationProvider.updateConfiguration(basicConfiguration)
109             val validMessage = vesMessage(Domain.HVRANMEAS)
110             val msgWithGarbageFrame = garbageFrame()
111             val expectedRefCnt = 0
112
113             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
114
115             assertThat(handledEvents).hasSize(1)
116
117             assertThat(validMessage.refCnt())
118                     .describedAs("handled message should be released")
119                     .isEqualTo(expectedRefCnt)
120             assertThat(msgWithGarbageFrame.refCnt())
121                     .describedAs("message with garbage frame should be released")
122                     .isEqualTo(expectedRefCnt)
123
124         }
125     }
126
127     describe("message routing") {
128         it("should direct message to a topic by means of routing configuration") {
129             val sink = StoringSink()
130             val sut = Sut(sink)
131             sut.configurationProvider.updateConfiguration(basicConfiguration)
132
133             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
134             assertThat(messages).describedAs("number of routed messages").hasSize(1)
135
136             val msg = messages[0]
137             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
138             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
139         }
140
141         it("should be able to direct 2 messages from different domains to one topic") {
142             val sink = StoringSink()
143             val sut = Sut(sink)
144
145             sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
146
147             val messages = sut.handleConnection(sink,
148                     vesMessage(Domain.HVRANMEAS),
149                     vesMessage(Domain.HEARTBEAT),
150                     vesMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
151
152             assertThat(messages).describedAs("number of routed messages").hasSize(3)
153
154             assertThat(messages.get(0).topic).describedAs("first message topic")
155                     .isEqualTo(HVRANMEAS_TOPIC)
156
157             assertThat(messages.get(1).topic).describedAs("second message topic")
158                     .isEqualTo(HVRANMEAS_TOPIC)
159
160             assertThat(messages.get(2).topic).describedAs("last message topic")
161                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
162         }
163         
164         it("should drop message if route was not found") {
165             val sink = StoringSink()
166             val sut = Sut(sink)
167             sut.configurationProvider.updateConfiguration(basicConfiguration)
168             val messages = sut.handleConnection(sink,
169                     vesMessage(Domain.OTHER, "first"),
170                     vesMessage(Domain.HVRANMEAS, "second"),
171                     vesMessage(Domain.HEARTBEAT, "third"))
172
173             assertThat(messages).describedAs("number of routed messages").hasSize(1)
174
175             val msg = messages[0]
176             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
177             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
178         }
179     }
180
181     describe("request validation") {
182         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
183             val sink = StoringSink()
184             val sut = Sut(sink)
185             sut.configurationProvider.updateConfiguration(basicConfiguration)
186
187             val handledMessages = sut.handleConnection(sink,
188                     vesMessage(Domain.HVRANMEAS, "first"),
189                     vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
190                     vesMessage(Domain.HVRANMEAS, "third"))
191
192             assertThat(handledMessages).hasSize(1)
193             assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
194         }
195     }
196
197 })