Reject messages with payload size > 1MiB 41/58641/1
authorJakub Dudycz <jdudycz@nokia.com>
Thu, 28 Jun 2018 10:41:29 +0000 (12:41 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 09:14:32 +0000 (11:14 +0200)
- Update validation in WireFrameDecoder class
- Write unit and component tests for that case

Closes ONAP-340

Change-Id: I68cb608fd76118719b12a83de1ef930160f8a162
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt

index 49eedda..d917c71 100644 (file)
@@ -57,9 +57,11 @@ object VesHvSpecification : Spek({
             val validMessage = vesMessage(Domain.HVRANMEAS)
             val msgWithInvalidDomain = vesMessage(Domain.OTHER)
             val msgWithInvalidFrame = invalidWireFrame()
+            val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
             val expectedRefCnt = 0
 
-            val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
+            val handledEvents = sut.handleConnection(
+                    sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
 
             assertThat(handledEvents).hasSize(1)
 
@@ -72,6 +74,9 @@ object VesHvSpecification : Spek({
             assertThat(msgWithInvalidFrame.refCnt())
                     .describedAs("message with invalid frame should be released")
                     .isEqualTo(expectedRefCnt)
+            assertThat(msgWithTooBigPayload.refCnt())
+                    .describedAs("message with payload exceeding 1MiB should be released")
+                    .isEqualTo(expectedRefCnt)
 
         }
 
@@ -148,4 +153,20 @@ object VesHvSpecification : Spek({
             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
         }
     }
+
+    describe("request validation") {
+        it("should reject message with payload greater than 1 MiB and all subsequent messages") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val handledMessages = sut.handleConnection(sink,
+                    vesMessage(Domain.HVRANMEAS, "first"),
+                    vesMessageWithTooBigPayload(Domain.HVRANMEAS, "second"),
+                    vesMessage(Domain.HVRANMEAS, "third"))
+
+            assertThat(handledMessages).hasSize(1)
+            assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
+        }
+    }
 })
index 8895d64..e620e6b 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.component
 
 import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import io.netty.buffer.PooledByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.*
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+import org.onap.ves.HVRanMeasFieldsV5
 import org.onap.ves.VesEventV5.VesEvent
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -29,7 +33,7 @@ import java.util.*
 
 val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
 
-fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = allocator.buffer().run {
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
     writeByte(0xFF) // always 0xFF
     writeByte(0x01)   // version
     writeByte(0x01)   // content type = GPB
@@ -40,7 +44,7 @@ fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toS
 }
 
 
-fun invalidVesMessage() = allocator.buffer().run {
+fun invalidVesMessage(): ByteBuf = allocator.buffer().run {
     writeByte(0xFF) // always 0xFF
     writeByte(0x01)   // version
     writeByte(0x01)   // content type = GPB
@@ -51,17 +55,32 @@ fun invalidVesMessage() = allocator.buffer().run {
 
 }
 
-fun garbageFrame() = allocator.buffer().run {
+fun garbageFrame(): ByteBuf = allocator.buffer().run {
     writeBytes("the meaning of life is &@)(*_!".toByteArray())
 }
 
-fun invalidWireFrame() = allocator.buffer().run {
+fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
     writeByte(0xFF)
     writeByte(0x01)   // version
     writeByte(0x01)   // content type = GPB
 }
 
-fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) =
+fun vesMessageWithTooBigPayload(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()): ByteBuf = allocator.buffer().run {
+    writeByte(0xFF) // always 0xFF
+    writeByte(0x01)   // version
+    writeByte(0x01)   // content type = GPB
+
+    val gpb = vesEvent(
+            domain,
+            id,
+            ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+    ).toByteString().asReadOnlyByteBuffer()
+
+    writeInt(gpb.limit())  // ves event size in bytes
+    writeBytes(gpb)  // ves event as GPB bytes
+}
+
+fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY) =
         VesEvent.newBuilder()
                 .setCommonEventHeader(
                         CommonEventHeader.getDefaultInstance().toBuilder()
@@ -76,5 +95,5 @@ fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().t
                                 .setStartEpochMicrosec(120034455)
                                 .setLastEpochMicrosec(120034459)
                                 .setSequence(1))
-                .setHvRanMeasFields(ByteString.EMPTY)
+                .setHvRanMeasFields(hvRanMeasFields)
                 .build()
index 22767ed..39841d6 100644 (file)
@@ -52,9 +52,9 @@ class WireFrameDecoder {
 
     fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
             when {
-                isEmpty(byteBuf)          -> Left(EmptyWireFrame)
+                isEmpty(byteBuf) -> Left(EmptyWireFrame)
                 headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
-                else                      -> parseFrame(byteBuf)
+                else -> parseFrame(byteBuf)
             }
 
     private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
@@ -74,6 +74,11 @@ class WireFrameDecoder {
         val payloadTypeRaw = byteBuf.readUnsignedByte()
 
         val payloadSize = byteBuf.readInt()
+
+        if (payloadSize > MAX_PAYLOAD_SIZE) {
+            return Left(PayloadSizeExceeded)
+        }
+
         if (byteBuf.readableBytes() < payloadSize) {
             byteBuf.resetReaderIndex()
             return Left(MissingWireFramePayloadBytes)
@@ -83,4 +88,8 @@ class WireFrameDecoder {
 
         return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
     }
+
+    companion object {
+        const val MAX_PAYLOAD_SIZE = 1024 * 1024
+    }
 }
index fb22520..626bf32 100644 (file)
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
+
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
@@ -35,6 +37,7 @@ class InvalidWireFrameMarker(actualMarker: Short)
     : InvalidWireFrame(
         "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
 
+object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
 
 // Missing bytes errors
 
index a97d889..4d6f071 100644 (file)
@@ -30,7 +30,9 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder.Companion.MAX_PAYLOAD_SIZE
 import java.nio.charset.Charset
+import kotlin.test.assertTrue
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -153,7 +155,7 @@ object WireFrameCodecsTest : Spek({
                 assertThat(buff.readableBytes()).isEqualTo(1)
             }
 
-            it("should throw exception when not even header fits") {
+            it("should return error when not even header fits") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xFF)
 
@@ -161,7 +163,7 @@ object WireFrameCodecsTest : Spek({
 
             }
 
-            it("should throw exception when first byte is not 0xFF but length looks ok") {
+            it("should return error when first byte is not 0xFF but length looks ok") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xAA)
                         .writeBytes("some garbage".toByteArray())
@@ -169,14 +171,14 @@ object WireFrameCodecsTest : Spek({
                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
             }
 
-            it("should throw exception when first byte is not 0xFF and length is to short") {
+            it("should return error when first byte is not 0xFF and length is to short") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xAA)
 
                 decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
             }
 
-            it("should throw exception when payload doesn't fit") {
+            it("should return error when payload doesn't fit") {
                 val buff = Unpooled.buffer()
                         .writeBytes(encodeSampleFrame())
                 buff.writerIndex(buff.writerIndex() - 2)
@@ -185,8 +187,50 @@ object WireFrameCodecsTest : Spek({
             }
 
         }
-    }
 
+        describe("payload size limit"){
+
+            it("should decode successfully when payload size is equal 1 MiB") {
+
+                val payload = ByteArray(MAX_PAYLOAD_SIZE)
+                val input = WireFrame(
+                        payload = ByteData(payload),
+                        version = 1,
+                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadSize = payload.size)
+
+
+                assertTrue(decoder.decodeFirst(encoder.encode(input)).isRight())
+            }
+
+            it("should return error when payload exceeds 1 MiB") {
+
+                val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+                val input = WireFrame(
+                        payload = ByteData(payload),
+                        version = 1,
+                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadSize = payload.size)
+
+
+                decoder.decodeFirst(encoder.encode(input))
+                        .assertFailedWithError { it.isInstanceOf(PayloadSizeExceeded::class.java) }
+            }
+
+            it("should validate only first message") {
+
+                val payload = ByteArray(MAX_PAYLOAD_SIZE)
+                val input = WireFrame(
+                        payload = ByteData(payload),
+                        version = 1,
+                        payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+                        payloadSize = payload.size)
+
+
+                assertTrue(decoder.decodeFirst(encoder.encode(input).writeByte(0xFF)).isRight())
+            }
+        }
+    }
 })
 
 private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {