2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.impl.wire
22 import io.netty.buffer.ByteBuf
23 import io.netty.buffer.Unpooled
24 import io.netty.buffer.UnpooledByteBufAllocator
25 import org.assertj.core.api.Assertions.assertThat
26 import org.jetbrains.spek.api.Spek
27 import org.jetbrains.spek.api.dsl.describe
28 import org.jetbrains.spek.api.dsl.given
29 import org.jetbrains.spek.api.dsl.it
30 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
31 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
32 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
33 import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
34 import reactor.test.test
37 * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
40 internal object WireChunkDecoderTest : Spek({
41 val alloc = UnpooledByteBufAllocator.DEFAULT
42 val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
43 val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
45 val encoder = WireFrameEncoder(alloc)
47 fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
49 fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
51 fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
52 for (bb in byteBuffers) {
53 assertThat(bb.refCnt())
54 .describedAs("should be released: $bb ref count")
59 fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
60 for (bb in byteBuffers) {
61 assertThat(bb.refCnt())
62 .describedAs("should not be released: $bb ref count")
67 describe("decoding wire protocol") {
68 given("empty input") {
69 val input = Unpooled.EMPTY_BUFFER
71 it("should yield empty result") {
72 createInstance().decode(input).test().verifyComplete()
76 given("input with no readable bytes") {
77 val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
79 it("should yield empty result") {
80 createInstance().decode(input).test().verifyComplete()
83 it("should release memory") {
84 verifyMemoryReleased(input)
88 given("invalid input (not starting with marker)") {
89 val input = Unpooled.wrappedBuffer(samplePayload)
91 it("should yield error") {
92 createInstance().decode(input).test()
93 .verifyError(WireFrameException::class.java)
96 it("should leave memory unreleased") {
97 verifyMemoryNotReleased(input)
101 given("valid input") {
102 val input = WireFrameMessage(samplePayload)
104 it("should yield decoded input frame") {
105 createInstance().decode(input).test()
106 .expectNextMatches { it.payloadSize == samplePayload.size }
111 given("valid input with part of next frame") {
112 val input = Unpooled.buffer()
113 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
114 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
116 it("should yield decoded input frame") {
117 createInstance().decode(input).test()
118 .expectNextMatches { it.payloadSize == samplePayload.size }
122 it("should leave memory unreleased") {
123 verifyMemoryNotReleased(input)
127 given("valid input with garbage after it") {
128 val input = Unpooled.buffer()
129 .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
130 .writeBytes(Unpooled.wrappedBuffer(samplePayload))
132 it("should yield decoded input frame and error") {
133 createInstance().decode(input).test()
134 .expectNextMatches { it.payloadSize == samplePayload.size }
135 .verifyError(WireFrameException::class.java)
138 it("should leave memory unreleased") {
139 verifyMemoryNotReleased(input)
143 given("two inputs containing two separate messages") {
144 val input1 = encoder.encode(WireFrameMessage(samplePayload))
145 val input2 = encoder.encode(WireFrameMessage(anotherPayload))
147 it("should yield decoded input frames") {
148 val cut = createInstance()
149 cut.decode(input1).test()
150 .expectNextMatches { it.payloadSize == samplePayload.size }
152 cut.decode(input2).test()
153 .expectNextMatches { it.payloadSize == anotherPayload.size }
157 it("should release memory") {
158 verifyMemoryReleased(input1, input2)
162 given("1st input containing 1st frame and 2nd input containing garbage") {
163 val input1 = encoder.encode(WireFrameMessage(samplePayload))
164 val input2 = Unpooled.wrappedBuffer(anotherPayload)
166 it("should yield decoded input frames") {
167 val cut = createInstance()
170 .expectNextMatches { it.payloadSize == samplePayload.size }
172 cut.decode(input2).test()
173 .verifyError(WireFrameException::class.java)
176 it("should release memory for 1st input") {
177 verifyMemoryReleased(input1)
180 it("should leave memory unreleased for 2nd input") {
181 verifyMemoryNotReleased(input2)
186 given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
187 val frame1 = encoder.encode(WireFrameMessage(samplePayload))
188 val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
190 val input1 = Unpooled.buffer()
192 .writeBytes(frame2, 3)
193 val input2 = Unpooled.buffer().writeBytes(frame2)
195 it("should yield decoded input frames") {
196 val cut = createInstance()
197 cut.decode(input1).test()
198 .expectNextMatches { it.payloadSize == samplePayload.size }
200 cut.decode(input2).test()
201 .expectNextMatches { it.payloadSize == anotherPayload.size }
205 it("should release memory") {
206 verifyMemoryReleased(input1, input2)
210 given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
211 val frame1 = encoder.encode(WireFrameMessage(samplePayload))
212 val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
214 val input1 = Unpooled.buffer()
215 .writeBytes(frame1, 5)
216 val input2 = Unpooled.buffer()
220 it("should yield decoded input frames") {
221 val cut = createInstance()
222 cut.decode(input1).test()
224 cut.decode(input2).test()
225 .expectNextMatches { it.payloadSize == samplePayload.size }
226 .expectNextMatches { it.payloadSize == anotherPayload.size }
230 it("should release memory") {
231 verifyMemoryReleased(input1, input2)