2 * ============LICENSE_START=======================================================
3 * dcaegen2-collectors-veshv
4 * ================================================================================
5 * Copyright (C) 2018 NOKIA
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.collectors.veshv.domain
22 import arrow.core.Either
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.Unpooled
25 import io.netty.buffer.UnpooledByteBufAllocator
26 import org.assertj.core.api.Assertions.assertThat
27 import org.assertj.core.api.ObjectAssert
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
33 import java.nio.charset.Charset
34 import kotlin.test.assertTrue
35 import kotlin.test.fail
38 * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
41 object WireFrameCodecsTest : Spek({
42 val payloadAsString = "coffeebabe"
43 val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
44 val decoder = WireFrameDecoder()
46 fun createSampleFrame() =
47 PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
49 fun encodeSampleFrame() =
50 createSampleFrame().let {
54 describe("Wire Frame invariants") {
56 given("input with unsupported major version") {
57 val input = PayloadWireFrameMessage(
58 payload = ByteData.EMPTY,
61 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
64 it("should fail validation") {
65 assertThat(input.isValid()).isFalse()
69 given("input with unsupported minor version") {
70 val input = PayloadWireFrameMessage(
71 payload = ByteData.EMPTY,
74 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
77 it("should pass validation") {
78 assertThat(input.isValid()).isTrue()
82 given("input with unsupported payload type") {
83 val input = PayloadWireFrameMessage(
84 payload = ByteData.EMPTY,
87 payloadTypeRaw = 0x69,
90 it("should fail validation") {
91 assertThat(input.isValid()).isFalse()
95 given("input with too small payload size") {
96 val input = PayloadWireFrameMessage(
97 payload = ByteData(byteArrayOf(1, 2, 3)),
100 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
103 it("should fail validation") {
104 assertThat(input.isValid()).isFalse()
108 given("input with too big payload size") {
109 val input = PayloadWireFrameMessage(
110 payload = ByteData(byteArrayOf(1, 2, 3)),
113 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
116 it("should fail validation") {
117 assertThat(input.isValid()).isFalse()
121 given("valid input") {
122 val payload = byteArrayOf(6, 9, 8, 6)
123 val input = PayloadWireFrameMessage(
124 payload = ByteData(payload),
127 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
128 payloadSize = payload.size)
130 it("should pass validation") {
131 assertThat(input.isValid()).isTrue()
138 describe("Wire Frame codec") {
140 describe("encode-decode methods' compatibility") {
141 val frame = createSampleFrame()
142 val encoded = encodeSampleFrame()
143 val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
145 it("should decode version") {
146 assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
149 it("should decode payload type") {
150 assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw)
153 it("should decode payload size") {
154 assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
157 it("should decode payload") {
158 assertThat(decoded.payload.asString())
159 .isEqualTo(payloadAsString)
164 describe("TCP framing") {
165 // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
167 it("should return error when buffer is empty") {
168 val buff = Unpooled.buffer()
170 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
171 assertBufferIntact(buff)
174 it("should return end-of-transmission message when given end-of-transmission marker byte") {
175 val buff = Unpooled.buffer()
178 assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
181 it("should return error when given any single byte other than end-of-transmission marker byte") {
182 val buff = Unpooled.buffer()
185 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
186 assertBufferIntact(buff)
189 it("should return error when payload message header does not fit") {
190 val buff = Unpooled.buffer()
192 .writeBytes("MOMOM".toByteArray())
194 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
195 assertBufferIntact(buff)
198 it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
199 val buff = Unpooled.buffer()
201 .writeBytes("some garbage".toByteArray())
203 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
204 assertBufferIntact(buff)
207 it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
208 val buff = Unpooled.buffer()
210 .writeBytes("some garbage".toByteArray())
212 assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
215 it("should return error when payload doesn't fit") {
216 val buff = Unpooled.buffer()
217 .writeBytes(encodeSampleFrame())
218 buff.writerIndex(buff.writerIndex() - 2)
220 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
221 assertBufferIntact(buff)
224 it("should decode payload message leaving rest unread") {
225 val buff = Unpooled.buffer()
226 .writeBytes(encodeSampleFrame())
228 val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
230 assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
231 assertThat(buff.readableBytes()).isEqualTo(1)
235 describe("payload size limit") {
237 it("should decode successfully when payload size is equal 1 MiB") {
239 val payload = ByteArray(MAX_PAYLOAD_SIZE)
240 val input = PayloadWireFrameMessage(
241 payload = ByteData(payload),
244 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
245 payloadSize = payload.size)
248 assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight())
251 it("should return error when payload exceeds 1 MiB") {
253 val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
254 val input = PayloadWireFrameMessage(
255 payload = ByteData(payload),
258 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
259 payloadSize = payload.size)
260 val buff = encoder.encode(input)
262 decoder.decodeFirst(buff)
263 .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) }
264 assertBufferIntact(buff)
267 it("should validate only first message") {
269 val payload = ByteArray(MAX_PAYLOAD_SIZE)
270 val input = PayloadWireFrameMessage(
271 payload = ByteData(payload),
274 payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
275 payloadSize = payload.size)
278 assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xFF)).isRight())
284 private fun assertBufferIntact(buff: ByteBuf) {
285 assertThat(buff.refCnt()).describedAs("buffer should not be released").isEqualTo(1)
286 assertThat(buff.readerIndex()).describedAs("buffer reader index should be intact").isEqualTo(0)
289 private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {
290 fold({ assertj(assertThat(it)) }, { fail("Error expected") })
293 private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
294 fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
296 private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
297 this as? PayloadWireFrameMessage
298 ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
301 private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
302 decoded.getEndOfTransmissionMessageOrFail()
305 private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
306 fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
308 private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
309 this as? EndOfTransmissionMessage
310 ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")