Align with latest proposal of wire proto frame
[dcaegen2/collectors/hv-ves.git] / hv-collector-domain / src / test / kotlin / org / onap / dcae / collectors / veshv / domain / WireFrameCodecsTest.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.domain
21
22 import arrow.core.Either
23 import io.netty.buffer.ByteBuf
24 import io.netty.buffer.Unpooled
25 import io.netty.buffer.UnpooledByteBufAllocator
26 import org.assertj.core.api.Assertions.assertThat
27 import org.assertj.core.api.ObjectAssert
28 import org.jetbrains.spek.api.Spek
29 import org.jetbrains.spek.api.dsl.describe
30 import org.jetbrains.spek.api.dsl.given
31 import org.jetbrains.spek.api.dsl.it
32 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
33 import java.nio.charset.Charset
34 import kotlin.test.assertTrue
35 import kotlin.test.fail
36
37 /**
38  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
39  * @since June 2018
40  */
41 object WireFrameCodecsTest : Spek({
42     val payloadAsString = "coffeebabe"
43     val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
44     val decoder = WireFrameDecoder()
45
46     fun createSampleFrame() =
47             PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
48
49     fun encodeSampleFrame() =
50             createSampleFrame().let {
51                 encoder.encode(it)
52             }
53
54     describe("Wire Frame invariants") {
55
56         given("input with unsupported major version") {
57             val input = PayloadWireFrameMessage(
58                     payload = ByteData.EMPTY,
59                     versionMajor = 100,
60                     versionMinor = 0,
61                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
62                     payloadSize = 0)
63
64             it("should fail validation") {
65                 assertThat(input.isValid()).isFalse()
66             }
67         }
68
69         given("input with unsupported minor version") {
70             val input = PayloadWireFrameMessage(
71                     payload = ByteData.EMPTY,
72                     versionMajor = 1,
73                     versionMinor = 6,
74                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
75                     payloadSize = 0)
76
77             it("should pass validation") {
78                 assertThat(input.isValid()).isTrue()
79             }
80         }
81
82         given("input with unsupported payload type") {
83             val input = PayloadWireFrameMessage(
84                     payload = ByteData.EMPTY,
85                     versionMajor = 1,
86                     versionMinor = 0,
87                     payloadTypeRaw = 0x69,
88                     payloadSize = 0)
89
90             it("should fail validation") {
91                 assertThat(input.isValid()).isFalse()
92             }
93         }
94
95         given("input with too small payload size") {
96             val input = PayloadWireFrameMessage(
97                     payload = ByteData(byteArrayOf(1, 2, 3)),
98                     versionMajor = 1,
99                     versionMinor = 0,
100                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
101                     payloadSize = 1)
102
103             it("should fail validation") {
104                 assertThat(input.isValid()).isFalse()
105             }
106         }
107
108         given("input with too big payload size") {
109             val input = PayloadWireFrameMessage(
110                     payload = ByteData(byteArrayOf(1, 2, 3)),
111                     versionMajor = 1,
112                     versionMinor = 0,
113                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
114                     payloadSize = 8)
115
116             it("should fail validation") {
117                 assertThat(input.isValid()).isFalse()
118             }
119         }
120
121         given("valid input") {
122             val payload = byteArrayOf(6, 9, 8, 6)
123             val input = PayloadWireFrameMessage(
124                     payload = ByteData(payload),
125                     versionMajor = 1,
126                     versionMinor = 0,
127                     payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
128                     payloadSize = payload.size)
129
130             it("should pass validation") {
131                 assertThat(input.isValid()).isTrue()
132             }
133         }
134
135
136     }
137
138     describe("Wire Frame codec") {
139
140         describe("encode-decode methods' compatibility") {
141             val frame = createSampleFrame()
142             val encoded = encodeSampleFrame()
143             val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
144
145             it("should decode version") {
146                 assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
147             }
148
149             it("should decode payload type") {
150                 assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw)
151             }
152
153             it("should decode payload size") {
154                 assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
155             }
156
157             it("should decode payload") {
158                 assertThat(decoded.payload.asString())
159                         .isEqualTo(payloadAsString)
160             }
161         }
162
163
164         describe("TCP framing") {
165             // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
166
167             it("should return error when buffer is empty") {
168                 val buff = Unpooled.buffer()
169
170                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(EmptyWireFrame::class.java) }
171                 assertBufferIntact(buff)
172             }
173
174             it("should return end-of-transmission message when given end-of-transmission marker byte") {
175                 val buff = Unpooled.buffer()
176                         .writeByte(0xAA)
177
178                 assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
179             }
180
181             it("should return error when given any single byte other than end-of-transmission marker byte") {
182                 val buff = Unpooled.buffer()
183                         .writeByte(0xEE)
184
185                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
186                 assertBufferIntact(buff)
187             }
188
189             it("should return error when payload message header does not fit") {
190                 val buff = Unpooled.buffer()
191                         .writeByte(0xFF)
192                         .writeBytes("MOMOM".toByteArray())
193
194                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
195                 assertBufferIntact(buff)
196             }
197
198             it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
199                 val buff = Unpooled.buffer()
200                         .writeByte(0x69)
201                         .writeBytes("some garbage".toByteArray())
202
203                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
204                 assertBufferIntact(buff)
205             }
206
207             it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
208                 val buff = Unpooled.buffer()
209                         .writeByte(0xAA)
210                         .writeBytes("some garbage".toByteArray())
211
212                 assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
213             }
214
215             it("should return error when payload doesn't fit") {
216                 val buff = Unpooled.buffer()
217                         .writeBytes(encodeSampleFrame())
218                 buff.writerIndex(buff.writerIndex() - 2)
219
220                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
221                 assertBufferIntact(buff)
222             }
223
224             it("should decode payload message leaving rest unread") {
225                 val buff = Unpooled.buffer()
226                         .writeBytes(encodeSampleFrame())
227                         .writeByte(0xAA)
228                 val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
229
230                 assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
231                 assertThat(buff.readableBytes()).isEqualTo(1)
232             }
233         }
234
235         describe("payload size limit") {
236
237             it("should decode successfully when payload size is equal 1 MiB") {
238
239                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
240                 val input = PayloadWireFrameMessage(
241                         payload = ByteData(payload),
242                         versionMajor = 1,
243                         versionMinor = 0,
244                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
245                         payloadSize = payload.size)
246
247
248                 assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight())
249             }
250
251             it("should return error when payload exceeds 1 MiB") {
252
253                 val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
254                 val input = PayloadWireFrameMessage(
255                         payload = ByteData(payload),
256                         versionMajor = 1,
257                         versionMinor = 0,
258                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
259                         payloadSize = payload.size)
260                 val buff = encoder.encode(input)
261
262                 decoder.decodeFirst(buff)
263                         .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) }
264                 assertBufferIntact(buff)
265             }
266
267             it("should validate only first message") {
268
269                 val payload = ByteArray(MAX_PAYLOAD_SIZE)
270                 val input = PayloadWireFrameMessage(
271                         payload = ByteData(payload),
272                         versionMajor = 1,
273                         versionMinor = 0,
274                         payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
275                         payloadSize = payload.size)
276
277
278                 assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xFF)).isRight())
279             }
280         }
281     }
282 })
283
284 private fun assertBufferIntact(buff: ByteBuf) {
285     assertThat(buff.refCnt()).describedAs("buffer should not be released").isEqualTo(1)
286     assertThat(buff.readerIndex()).describedAs("buffer reader index should be intact").isEqualTo(0)
287 }
288
289 private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {
290     fold({ assertj(assertThat(it)) }, { fail("Error expected") })
291 }
292
293 private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
294         fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
295
296 private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
297         this as? PayloadWireFrameMessage
298                 ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
299
300
301 private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
302     decoded.getEndOfTransmissionMessageOrFail()
303 }
304
305 private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
306         fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
307
308 private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
309         this as? EndOfTransmissionMessage
310                 ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")