import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
*/
class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- fun encode(frame: PayloadWireFrameMessage): ByteBuf {
- val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
-
- bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
- bb.writeByte(frame.versionMajor.toInt())
- bb.writeByte(frame.versionMinor.toInt())
- bb.writeZero(RESERVED_BYTE_COUNT)
- bb.writeByte(frame.payloadTypeRaw.toInt())
- bb.writeInt(frame.payloadSize)
- frame.payload.writeTo(bb)
-
- return bb
- }
+ fun encode(frame: WireFrameMessage): ByteBuf = allocator
+ .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+ .run {
+ writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+ writeByte(frame.versionMajor.toInt())
+ writeByte(frame.versionMinor.toInt())
+ writeZero(RESERVED_BYTE_COUNT)
+ writeShort(frame.payloadType)
+ writeInt(frame.payloadSize)
+ }
+ .also {
+ frame.payload.writeTo(it)
+ }
}
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameDecoder {
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
- isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
else -> parseWireFrame(byteBuf)
}
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
-
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
-
- private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
- byteBuf.markReaderIndex()
- val byte = byteBuf.readUnsignedByte()
-
- return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
- Right(EndOfTransmissionMessage)
- } else {
- byteBuf.resetReaderIndex()
- Left(MissingWireFrameHeaderBytes)
- }
- }
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
byteBuf.markReaderIndex()
val mark = byteBuf.readUnsignedByte()
return when (mark) {
- EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
- PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
else -> {
byteBuf.resetReaderIndex()
Left(InvalidWireFrameMarker(mark))
}
}
- private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
- byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
- val payloadTypeRaw = byteBuf.readUnsignedByte()
+ byteBuf.skipBytes(RESERVED_BYTE_COUNT)
+ val payloadTypeRaw = byteBuf.readUnsignedShort()
val payloadSize = byteBuf.readInt()
- if (payloadSize > MAX_PAYLOAD_SIZE) {
+ if (payloadSize > maxPayloadSizeBytes) {
byteBuf.resetReaderIndex()
- return Left(PayloadSizeExceeded)
+ return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
}
if (byteBuf.readableBytes() < payloadSize) {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+ return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
}
}