Bump checkstyle version
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireChunkDecoderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.wire
21
22 import io.netty.buffer.ByteBuf
23 import io.netty.buffer.Unpooled
24 import io.netty.buffer.UnpooledByteBufAllocator
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
31 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
32 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
33 import reactor.test.test
34
35 /**
36  * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
37  * @since May 2018
38  */
39 internal object WireChunkDecoderTest : Spek({
40     val alloc = UnpooledByteBufAllocator.DEFAULT
41     val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
42     val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
43
44     val encoder = WireFrameEncoder(alloc)
45
46     fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
47
48     fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
49
50     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
51         for (bb in byteBuffers) {
52             assertThat(bb.refCnt())
53                     .describedAs("should be released: $bb ref count")
54                     .isEqualTo(0)
55         }
56     }
57
58     fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
59         for (bb in byteBuffers) {
60             assertThat(bb.refCnt())
61                     .describedAs("should not be released: $bb ref count")
62                     .isEqualTo(1)
63         }
64     }
65
66     describe("decoding wire protocol") {
67         given("empty input") {
68             val input = Unpooled.EMPTY_BUFFER
69
70             it("should yield empty result") {
71                 createInstance().decode(input).test().verifyComplete()
72             }
73         }
74
75         given("input with no readable bytes") {
76             val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
77
78             it("should yield empty result") {
79                 createInstance().decode(input).test().verifyComplete()
80             }
81
82             it("should release memory") {
83                 verifyMemoryReleased(input)
84             }
85         }
86
87         given("invalid input (not starting with marker)") {
88             val input = Unpooled.wrappedBuffer(samplePayload)
89
90             it("should yield error") {
91                 createInstance().decode(input).test()
92                         .verifyError(WireFrameException::class.java)
93             }
94
95             it("should leave memory unreleased") {
96                 verifyMemoryNotReleased(input)
97             }
98         }
99
100         given("valid input") {
101             val input = WireFrameMessage(samplePayload)
102
103             it("should yield decoded input frame") {
104                 createInstance().decode(input).test()
105                         .expectNextMatches { it.payloadSize == samplePayload.size }
106                         .verifyComplete()
107             }
108         }
109
110         given("valid input with part of next frame") {
111             val input = Unpooled.buffer()
112                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
113                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
114
115             it("should yield decoded input frame") {
116                 createInstance().decode(input).test()
117                         .expectNextMatches { it.payloadSize == samplePayload.size }
118                         .verifyComplete()
119             }
120
121             it("should leave memory unreleased") {
122                 verifyMemoryNotReleased(input)
123             }
124         }
125
126         given("valid input with garbage after it") {
127             val input = Unpooled.buffer()
128                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
129                     .writeBytes(Unpooled.wrappedBuffer(samplePayload))
130
131             it("should yield decoded input frame and error") {
132                 createInstance().decode(input).test()
133                         .expectNextMatches { it.payloadSize == samplePayload.size }
134                         .verifyError(WireFrameException::class.java)
135             }
136
137             it("should leave memory unreleased") {
138                 verifyMemoryNotReleased(input)
139             }
140         }
141
142         given("two inputs containing two separate messages") {
143             val input1 = encoder.encode(WireFrameMessage(samplePayload))
144             val input2 = encoder.encode(WireFrameMessage(anotherPayload))
145
146             it("should yield decoded input frames") {
147                 val cut = createInstance()
148                 cut.decode(input1).test()
149                         .expectNextMatches { it.payloadSize == samplePayload.size }
150                         .verifyComplete()
151                 cut.decode(input2).test()
152                         .expectNextMatches { it.payloadSize == anotherPayload.size }
153                         .verifyComplete()
154             }
155
156             it("should release memory") {
157                 verifyMemoryReleased(input1, input2)
158             }
159         }
160
161         given("1st input containing 1st frame and 2nd input containing garbage") {
162             val input1 = encoder.encode(WireFrameMessage(samplePayload))
163             val input2 = Unpooled.wrappedBuffer(anotherPayload)
164
165             it("should yield decoded input frames") {
166                 val cut = createInstance()
167                 cut.decode(input1)
168                         .test()
169                         .expectNextMatches { it.payloadSize == samplePayload.size }
170                         .verifyComplete()
171                 cut.decode(input2).test()
172                         .verifyError(WireFrameException::class.java)
173             }
174
175             it("should release memory for 1st input") {
176                 verifyMemoryReleased(input1)
177             }
178
179             it("should leave memory unreleased for 2nd input") {
180                 verifyMemoryNotReleased(input2)
181             }
182         }
183
184
185         given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
186             val frame1 = encoder.encode(WireFrameMessage(samplePayload))
187             val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
188
189             val input1 = Unpooled.buffer()
190                     .writeBytes(frame1)
191                     .writeBytes(frame2, 3)
192             val input2 = Unpooled.buffer().writeBytes(frame2)
193
194             it("should yield decoded input frames") {
195                 val cut = createInstance()
196                 cut.decode(input1).test()
197                         .expectNextMatches { it.payloadSize == samplePayload.size }
198                         .verifyComplete()
199                 cut.decode(input2).test()
200                         .expectNextMatches { it.payloadSize == anotherPayload.size }
201                         .verifyComplete()
202             }
203
204             it("should release memory") {
205                 verifyMemoryReleased(input1, input2)
206             }
207         }
208
209         given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
210             val frame1 = encoder.encode(WireFrameMessage(samplePayload))
211             val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
212
213             val input1 = Unpooled.buffer()
214                     .writeBytes(frame1, 5)
215             val input2 = Unpooled.buffer()
216                     .writeBytes(frame1)
217                     .writeBytes(frame2)
218
219             it("should yield decoded input frames") {
220                 val cut = createInstance()
221                 cut.decode(input1).test()
222                         .verifyComplete()
223                 cut.decode(input2).test()
224                         .expectNextMatches { it.payloadSize == samplePayload.size }
225                         .expectNextMatches { it.payloadSize == anotherPayload.size }
226                         .verifyComplete()
227             }
228
229             it("should release memory") {
230                 verifyMemoryReleased(input1, input2)
231             }
232         }
233     }
234 })