Reject messages with payload size > 1MiB
[dcaegen2/collectors/hv-ves.git] / hv-collector-domain / src / main / kotlin / org / onap / dcae / collectors / veshv / domain / codec.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.domain
21
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.Right
25 import io.netty.buffer.ByteBuf
26 import io.netty.buffer.ByteBufAllocator
27
28 /**
29  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
30  * @since June 2018
31  */
32 class WireFrameEncoder(val allocator: ByteBufAllocator) {
33
34     fun encode(frame: WireFrame): ByteBuf {
35         val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
36
37         bb.writeByte(WireFrame.MARKER_BYTE.toInt())
38         bb.writeByte(frame.version.toInt())
39         bb.writeByte(frame.payloadTypeRaw.toInt())
40         bb.writeInt(frame.payloadSize)
41         frame.payload.writeTo(bb)
42
43         return bb
44     }
45 }
46
47 /**
48  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
49  * @since June 2018
50  */
51 class WireFrameDecoder {
52
53     fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
54             when {
55                 isEmpty(byteBuf) -> Left(EmptyWireFrame)
56                 headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
57                 else -> parseFrame(byteBuf)
58             }
59
60     private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
61
62     private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
63
64     private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
65         byteBuf.markReaderIndex()
66
67         val mark = byteBuf.readUnsignedByte()
68         if (mark != WireFrame.MARKER_BYTE) {
69             byteBuf.resetReaderIndex()
70             return Left(InvalidWireFrameMarker(mark))
71         }
72
73         val version = byteBuf.readUnsignedByte()
74         val payloadTypeRaw = byteBuf.readUnsignedByte()
75
76         val payloadSize = byteBuf.readInt()
77
78         if (payloadSize > MAX_PAYLOAD_SIZE) {
79             return Left(PayloadSizeExceeded)
80         }
81
82         if (byteBuf.readableBytes() < payloadSize) {
83             byteBuf.resetReaderIndex()
84             return Left(MissingWireFramePayloadBytes)
85         }
86
87         val payload = ByteData.readFrom(byteBuf, payloadSize)
88
89         return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
90     }
91
92     companion object {
93         const val MAX_PAYLOAD_SIZE = 1024 * 1024
94     }
95 }