2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.wire
22 import io.netty.buffer.ByteBuf
23 import io.netty.buffer.Unpooled
24 import io.netty.buffer.UnpooledByteBufAllocator
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
31 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
32 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
33 import reactor.test.test
36 * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
39 internal object WireChunkDecoderTest : Spek({
40 val alloc = UnpooledByteBufAllocator.DEFAULT
41 val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
42 val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
44 val encoder = WireFrameEncoder(alloc)
46 fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
48 fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
50 fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
51 for (bb in byteBuffers) {
52 assertThat(bb.refCnt())
53 .describedAs("should be released: $bb ref count")
58 fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
59 for (bb in byteBuffers) {
60 assertThat(bb.refCnt())
61 .describedAs("should not be released: $bb ref count")
66 describe("decoding wire protocol") {
67 given("empty input") {
68 val input = Unpooled.EMPTY_BUFFER
70 it("should yield empty result") {
71 createInstance().decode(input).test().verifyComplete()
75 given("input with no readable bytes") {
76 val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
78 it("should yield empty result") {
79 createInstance().decode(input).test().verifyComplete()
82 it("should release memory") {
83 verifyMemoryReleased(input)
87 given("invalid input (not starting with marker)") {
88 val input = Unpooled.wrappedBuffer(samplePayload)
90 it("should yield error") {
91 createInstance().decode(input).test()
92 .verifyError(WireFrameException::class.java)
95 it("should leave memory unreleased") {
96 verifyMemoryNotReleased(input)
100 given("valid input") {
101 val input = WireFrameMessage(samplePayload)
103 it("should yield decoded input frame") {
104 createInstance().decode(input).test()
105 .expectNextMatches { it.payloadSize == samplePayload.size }
110 given("valid input with part of next frame") {
111 val input = Unpooled.buffer()
112 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
113 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
115 it("should yield decoded input frame") {
116 createInstance().decode(input).test()
117 .expectNextMatches { it.payloadSize == samplePayload.size }
121 it("should leave memory unreleased") {
122 verifyMemoryNotReleased(input)
126 given("valid input with garbage after it") {
127 val input = Unpooled.buffer()
128 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
129 .writeBytes(Unpooled.wrappedBuffer(samplePayload))
131 it("should yield decoded input frame and error") {
132 createInstance().decode(input).test()
133 .expectNextMatches { it.payloadSize == samplePayload.size }
134 .verifyError(WireFrameException::class.java)
137 it("should leave memory unreleased") {
138 verifyMemoryNotReleased(input)
142 given("two inputs containing two separate messages") {
143 val input1 = encoder.encode(WireFrameMessage(samplePayload))
144 val input2 = encoder.encode(WireFrameMessage(anotherPayload))
146 it("should yield decoded input frames") {
147 val cut = createInstance()
148 cut.decode(input1).test()
149 .expectNextMatches { it.payloadSize == samplePayload.size }
151 cut.decode(input2).test()
152 .expectNextMatches { it.payloadSize == anotherPayload.size }
156 it("should release memory") {
157 verifyMemoryReleased(input1, input2)
161 given("1st input containing 1st frame and 2nd input containing garbage") {
162 val input1 = encoder.encode(WireFrameMessage(samplePayload))
163 val input2 = Unpooled.wrappedBuffer(anotherPayload)
165 it("should yield decoded input frames") {
166 val cut = createInstance()
169 .expectNextMatches { it.payloadSize == samplePayload.size }
171 cut.decode(input2).test()
172 .verifyError(WireFrameException::class.java)
175 it("should release memory for 1st input") {
176 verifyMemoryReleased(input1)
179 it("should leave memory unreleased for 2nd input") {
180 verifyMemoryNotReleased(input2)
185 given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
186 val frame1 = encoder.encode(WireFrameMessage(samplePayload))
187 val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
189 val input1 = Unpooled.buffer()
191 .writeBytes(frame2, 3)
192 val input2 = Unpooled.buffer().writeBytes(frame2)
194 it("should yield decoded input frames") {
195 val cut = createInstance()
196 cut.decode(input1).test()
197 .expectNextMatches { it.payloadSize == samplePayload.size }
199 cut.decode(input2).test()
200 .expectNextMatches { it.payloadSize == anotherPayload.size }
204 it("should release memory") {
205 verifyMemoryReleased(input1, input2)
209 given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
210 val frame1 = encoder.encode(WireFrameMessage(samplePayload))
211 val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
213 val input1 = Unpooled.buffer()
214 .writeBytes(frame1, 5)
215 val input2 = Unpooled.buffer()
219 it("should yield decoded input frames") {
220 val cut = createInstance()
221 cut.decode(input1).test()
223 cut.decode(input2).test()
224 .expectNextMatches { it.payloadSize == samplePayload.size }
225 .expectNextMatches { it.payloadSize == anotherPayload.size }
229 it("should release memory") {
230 verifyMemoryReleased(input1, input2)