08450598937cbc53691308eff6d8cdd8101b760e
[dcaegen2/collectors/hv-ves.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.tests.component
21
22 import com.google.protobuf.InvalidProtocolBufferException
23 import org.assertj.core.api.Assertions.assertThat
24 import org.jetbrains.spek.api.Spek
25 import org.jetbrains.spek.api.dsl.describe
26 import org.jetbrains.spek.api.dsl.it
27 import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
28 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
29 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
30 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
31 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
32
33 /**
34  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35  * @since May 2018
36  */
37 object VesHvSpecification : Spek({
38     describe("VES High Volume Collector") {
39         it("should handle multiple HV RAN events") {
40             val sink = StoringSink()
41             val sut = Sut(sink)
42             sut.configurationProvider.updateConfiguration(basicConfiguration)
43             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
44
45             assertThat(messages)
46                     .describedAs("should send all events")
47                     .hasSize(2)
48         }
49     }
50
51     describe("Memory management") {
52
53         it("should release memory for each handled and dropped message") {
54             val sink = StoringSink()
55             val sut = Sut(sink)
56             sut.configurationProvider.updateConfiguration(basicConfiguration)
57             val validMessage = vesMessage(Domain.HVRANMEAS)
58             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
59             val msgWithInvalidFrame = invalidWireFrame()
60             val expectedRefCnt = 0
61
62             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
63
64             assertThat(handledEvents).hasSize(1)
65
66             assertThat(validMessage.refCnt())
67                     .describedAs("handled message should be released")
68                     .isEqualTo(expectedRefCnt)
69             assertThat(msgWithInvalidDomain.refCnt())
70                     .describedAs("message with invalid domain should be released")
71                     .isEqualTo(expectedRefCnt)
72             assertThat(msgWithInvalidFrame.refCnt())
73                     .describedAs("message with invalid frame should be released")
74                     .isEqualTo(expectedRefCnt)
75
76         }
77
78         it("should release memory for each message with invalid payload") {
79             val sink = StoringSink()
80             val sut = Sut(sink)
81             sut.configurationProvider.updateConfiguration(basicConfiguration)
82             val validMessage = vesMessage(Domain.HVRANMEAS)
83             val msgWithInvalidPayload = invalidVesMessage()
84             val expectedRefCnt = 0
85
86             val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
87
88             assertThat(handledEvents).hasSize(1)
89
90             assertThat(validMessage.refCnt())
91                     .describedAs("handled message should be released")
92                     .isEqualTo(expectedRefCnt)
93             assertThat(msgWithInvalidPayload.refCnt())
94                     .describedAs("message with invalid payload should be released")
95                     .isEqualTo(expectedRefCnt)
96
97         }
98
99         it("should release memory for each message with garbage frame") {
100             val sink = StoringSink()
101             val sut = Sut(sink)
102             sut.configurationProvider.updateConfiguration(basicConfiguration)
103             val validMessage = vesMessage(Domain.HVRANMEAS)
104             val msgWithGarbageFrame = garbageFrame()
105             val expectedRefCnt = 0
106
107             val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
108
109             assertThat(handledEvents).hasSize(1)
110
111             assertThat(validMessage.refCnt())
112                     .describedAs("handled message should be released")
113                     .isEqualTo(expectedRefCnt)
114             assertThat(msgWithGarbageFrame.refCnt())
115                     .describedAs("message with garbage frame should be released")
116                     .isEqualTo(expectedRefCnt)
117
118         }
119     }
120
121     describe("message routing") {
122         it("should direct message to a topic by means of routing configuration") {
123             val sink = StoringSink()
124             val sut = Sut(sink)
125             sut.configurationProvider.updateConfiguration(basicConfiguration)
126
127             val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
128             assertThat(messages).describedAs("number of routed messages").hasSize(1)
129
130             val msg = messages[0]
131             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
132             assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
133         }
134
135         it("should drop message if route was not found") {
136             val sink = StoringSink()
137             val sut = Sut(sink)
138             sut.configurationProvider.updateConfiguration(basicConfiguration)
139             val messages = sut.handleConnection(sink,
140                     vesMessage(Domain.OTHER, "first"),
141                     vesMessage(Domain.HVRANMEAS, "second"),
142                     vesMessage(Domain.HEARTBEAT, "third"))
143
144             assertThat(messages).describedAs("number of routed messages").hasSize(1)
145
146             val msg = messages[0]
147             assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
148             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
149         }
150     }
151 })