Merge "Update README.md"
[dcaegen2/collectors/hv-ves.git] / hv-collector-domain / src / main / kotlin / org / onap / dcae / collectors / veshv / domain / codec.kt
1 /*
2  * ============LICENSE_START=======================================================
3  * dcaegen2-collectors-veshv
4  * ================================================================================
5  * Copyright (C) 2018 NOKIA
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.collectors.veshv.domain
21
22 import arrow.core.Either
23 import arrow.core.Left
24 import arrow.core.Right
25 import io.netty.buffer.ByteBuf
26 import io.netty.buffer.ByteBufAllocator
27 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
28 import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
29
30 /**
31  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
32  * @since June 2018
33  */
34 class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
35
36     fun encode(frame: PayloadWireFrameMessage): ByteBuf {
37         val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
38
39         bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
40         bb.writeByte(frame.versionMajor.toInt())
41         bb.writeByte(frame.versionMinor.toInt())
42         bb.writeZero(RESERVED_BYTE_COUNT)
43         bb.writeByte(frame.payloadTypeRaw.toInt())
44         bb.writeInt(frame.payloadSize)
45         frame.payload.writeTo(bb)
46
47         return bb
48     }
49 }
50
51 /**
52  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
53  * @since June 2018
54  */
55 class WireFrameDecoder {
56
57     fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
58             when {
59                 isEmpty(byteBuf) -> Left(EmptyWireFrame)
60                 isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
61                 headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
62                 else -> parseWireFrame(byteBuf)
63             }
64
65     private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
66
67     private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
68
69     private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
70
71     private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
72         byteBuf.markReaderIndex()
73         val byte = byteBuf.readUnsignedByte()
74
75         return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
76             Right(EndOfTransmissionMessage)
77         } else {
78             byteBuf.resetReaderIndex()
79             Left(MissingWireFrameHeaderBytes)
80         }
81     }
82
83     private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
84         byteBuf.markReaderIndex()
85
86         val mark = byteBuf.readUnsignedByte()
87         return when (mark) {
88             EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
89             PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
90             else -> {
91                 byteBuf.resetReaderIndex()
92                 Left(InvalidWireFrameMarker(mark))
93             }
94         }
95     }
96
97     private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
98         val versionMajor = byteBuf.readUnsignedByte()
99         val versionMinor = byteBuf.readUnsignedByte()
100         byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
101         val payloadTypeRaw = byteBuf.readUnsignedByte()
102         val payloadSize = byteBuf.readInt()
103
104         if (payloadSize > MAX_PAYLOAD_SIZE) {
105             byteBuf.resetReaderIndex()
106             return Left(PayloadSizeExceeded)
107         }
108
109         if (byteBuf.readableBytes() < payloadSize) {
110             byteBuf.resetReaderIndex()
111             return Left(MissingWireFramePayloadBytes)
112         }
113
114         val payload = ByteData.readFrom(byteBuf, payloadSize)
115
116         return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
117
118     }
119 }