Creation of server module
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / test / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireChunkDecoderTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.impl.wire
21
22 import io.netty.buffer.ByteBuf
23 import io.netty.buffer.Unpooled
24 import io.netty.buffer.UnpooledByteBufAllocator
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
31 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
32 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
33 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
34 import reactor.test.test
35
36 /**
37  * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
38  * @since May 2018
39  */
40 internal object WireChunkDecoderTest : Spek({
41     val alloc = UnpooledByteBufAllocator.DEFAULT
42     val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
43     val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
44
45     val encoder = WireFrameEncoder(alloc)
46
47     fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
48
49     fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
50
51     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
52         for (bb in byteBuffers) {
53             assertThat(bb.refCnt())
54                     .describedAs("should be released: $bb ref count")
55                     .isEqualTo(0)
56         }
57     }
58
59     fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
60         for (bb in byteBuffers) {
61             assertThat(bb.refCnt())
62                     .describedAs("should not be released: $bb ref count")
63                     .isEqualTo(1)
64         }
65     }
66
67     describe("decoding wire protocol") {
68         given("empty input") {
69             val input = Unpooled.EMPTY_BUFFER
70
71             it("should yield empty result") {
72                 createInstance().decode(input).test().verifyComplete()
73             }
74         }
75
76         given("input with no readable bytes") {
77             val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
78
79             it("should yield empty result") {
80                 createInstance().decode(input).test().verifyComplete()
81             }
82
83             it("should release memory") {
84                 verifyMemoryReleased(input)
85             }
86         }
87
88         given("invalid input (not starting with marker)") {
89             val input = Unpooled.wrappedBuffer(samplePayload)
90
91             it("should yield error") {
92                 createInstance().decode(input).test()
93                         .verifyError(WireFrameException::class.java)
94             }
95
96             it("should leave memory unreleased") {
97                 verifyMemoryNotReleased(input)
98             }
99         }
100
101         given("valid input") {
102             val input = WireFrameMessage(samplePayload)
103
104             it("should yield decoded input frame") {
105                 createInstance().decode(input).test()
106                         .expectNextMatches { it.payloadSize == samplePayload.size }
107                         .verifyComplete()
108             }
109         }
110
111         given("valid input with part of next frame") {
112             val input = Unpooled.buffer()
113                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
114                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
115
116             it("should yield decoded input frame") {
117                 createInstance().decode(input).test()
118                         .expectNextMatches { it.payloadSize == samplePayload.size }
119                         .verifyComplete()
120             }
121
122             it("should leave memory unreleased") {
123                 verifyMemoryNotReleased(input)
124             }
125         }
126
127         given("valid input with garbage after it") {
128             val input = Unpooled.buffer()
129                     .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
130                     .writeBytes(Unpooled.wrappedBuffer(samplePayload))
131
132             it("should yield decoded input frame and error") {
133                 createInstance().decode(input).test()
134                         .expectNextMatches { it.payloadSize == samplePayload.size }
135                         .verifyError(WireFrameException::class.java)
136             }
137
138             it("should leave memory unreleased") {
139                 verifyMemoryNotReleased(input)
140             }
141         }
142
143         given("two inputs containing two separate messages") {
144             val input1 = encoder.encode(WireFrameMessage(samplePayload))
145             val input2 = encoder.encode(WireFrameMessage(anotherPayload))
146
147             it("should yield decoded input frames") {
148                 val cut = createInstance()
149                 cut.decode(input1).test()
150                         .expectNextMatches { it.payloadSize == samplePayload.size }
151                         .verifyComplete()
152                 cut.decode(input2).test()
153                         .expectNextMatches { it.payloadSize == anotherPayload.size }
154                         .verifyComplete()
155             }
156
157             it("should release memory") {
158                 verifyMemoryReleased(input1, input2)
159             }
160         }
161
162         given("1st input containing 1st frame and 2nd input containing garbage") {
163             val input1 = encoder.encode(WireFrameMessage(samplePayload))
164             val input2 = Unpooled.wrappedBuffer(anotherPayload)
165
166             it("should yield decoded input frames") {
167                 val cut = createInstance()
168                 cut.decode(input1)
169                         .test()
170                         .expectNextMatches { it.payloadSize == samplePayload.size }
171                         .verifyComplete()
172                 cut.decode(input2).test()
173                         .verifyError(WireFrameException::class.java)
174             }
175
176             it("should release memory for 1st input") {
177                 verifyMemoryReleased(input1)
178             }
179
180             it("should leave memory unreleased for 2nd input") {
181                 verifyMemoryNotReleased(input2)
182             }
183         }
184
185
186         given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
187             val frame1 = encoder.encode(WireFrameMessage(samplePayload))
188             val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
189
190             val input1 = Unpooled.buffer()
191                     .writeBytes(frame1)
192                     .writeBytes(frame2, 3)
193             val input2 = Unpooled.buffer().writeBytes(frame2)
194
195             it("should yield decoded input frames") {
196                 val cut = createInstance()
197                 cut.decode(input1).test()
198                         .expectNextMatches { it.payloadSize == samplePayload.size }
199                         .verifyComplete()
200                 cut.decode(input2).test()
201                         .expectNextMatches { it.payloadSize == anotherPayload.size }
202                         .verifyComplete()
203             }
204
205             it("should release memory") {
206                 verifyMemoryReleased(input1, input2)
207             }
208         }
209
210         given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
211             val frame1 = encoder.encode(WireFrameMessage(samplePayload))
212             val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
213
214             val input1 = Unpooled.buffer()
215                     .writeBytes(frame1, 5)
216             val input2 = Unpooled.buffer()
217                     .writeBytes(frame1)
218                     .writeBytes(frame2)
219
220             it("should yield decoded input frames") {
221                 val cut = createInstance()
222                 cut.decode(input1).test()
223                         .verifyComplete()
224                 cut.decode(input2).test()
225                         .expectNextMatches { it.payloadSize == samplePayload.size }
226                         .expectNextMatches { it.payloadSize == anotherPayload.size }
227                         .verifyComplete()
228             }
229
230             it("should release memory") {
231                 verifyMemoryReleased(input1, input2)
232             }
233         }
234     }
235 })