Fix wire protocol decoder refCnt issue 55/58355/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 4 Jun 2018 11:51:29 +0000 (13:51 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 1 Aug 2018 07:06:13 +0000 (09:06 +0200)
We should use retain + slice because every reactor-netty operator
automatically releases the buffer.

Change-Id: Ie0282e70fadb56d56fc410a08e036fb0ca10584c
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt [new file with mode: 0644]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt

index 306b776..ffd59bd 100644 (file)
@@ -81,7 +81,7 @@ data class WireFrame(val payload: ByteBuf,
             val majorVersion = byteBuf.readUnsignedByte()
             val minorVersion = byteBuf.readUnsignedByte()
             val payloadSize = byteBuf.readInt()
-            val payload = byteBuf.slice()
+            val payload = byteBuf.retainedSlice()
 
             return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
         }
index af9d0b0..a3f26ce 100644 (file)
@@ -32,11 +32,11 @@ import reactor.core.publisher.Mono
  * @since May 2018
  */
 internal class VesHvCollector(
-        val wireDecoder: WireDecoder,
-        val protobufDecoder: VesDecoder,
-        val validator: MessageValidator,
-        val router: Router,
-        val sink: Sink) : Collector {
+        private val wireDecoder: WireDecoder,
+        private val protobufDecoder: VesDecoder,
+        private val validator: MessageValidator,
+        private val router: Router,
+        private val sink: Sink) : Collector {
     override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
             dataStream
                     .flatMap(this::decodeWire)
@@ -47,7 +47,7 @@ internal class VesHvCollector(
                     .doOnNext(this::releaseMemory)
                     .then()
 
-    private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode)
+    private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
 
     private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
 
@@ -71,6 +71,11 @@ internal class VesHvCollector(
         msg.rawMessage.release()
     }
 
+
+
+    private fun <T>omitWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> =
+            Mono.justOrEmpty(mapper(input))
+
     private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
         val result = mapper(input)
         return if (result == null) {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
new file mode 100644 (file)
index 0000000..5a923c4
--- /dev/null
@@ -0,0 +1,55 @@
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.Unpooled
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+object WireFrameTest : Spek({
+    describe("Wire Frame codec") {
+        describe("encode-decode methods' compatibility") {
+            val payloadContent = "test"
+            val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII))
+            val frame = WireFrame(payload = payload,
+                    majorVersion = 1,
+                    minorVersion = 2,
+                    mark = 0xFF,
+                    payloadSize = payload.readableBytes())
+
+            val encoded = frame.encode(ByteBufAllocator.DEFAULT)
+            val decoded = WireFrame.decode(encoded)
+
+            it("should decode major version") {
+                assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
+            }
+
+            it("should decode minor version") {
+                assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion)
+            }
+
+            it("should decode mark") {
+                assertThat(decoded.mark).isEqualTo(frame.mark)
+            }
+
+            it("should decode payload size") {
+                assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
+            }
+
+            it("should decode payload") {
+                assertThat(decoded.payload.toString(Charsets.US_ASCII))
+                        .isEqualTo(payloadContent)
+            }
+
+            it("should retain decoded payload") {
+                encoded.release()
+                assertThat(decoded.payload.refCnt()).isEqualTo(1)
+            }
+        }
+    }
+})
\ No newline at end of file
index 2cfb785..5990fd0 100644 (file)
@@ -47,21 +47,22 @@ object VesHvSpecification : Spek({
             val msgWithInvalidPayload = invalidVesMessage()
             val msgWithInvalidFrame = invalidWireFrame()
             val validMessage = vesMessage(Domain.HVRANMEAS)
+            val refCntBeforeSending = msgWithInvalidDomain.refCnt()
 
             sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
 
             assertThat(msgWithInvalidDomain.refCnt())
                     .describedAs("message with invalid domain should be released")
-                    .isEqualTo(0)
+                    .isEqualTo(refCntBeforeSending)
             assertThat(msgWithInvalidPayload.refCnt())
                     .describedAs("message with invalid payload should be released")
-                    .isEqualTo(0)
+                    .isEqualTo(refCntBeforeSending)
             assertThat(msgWithInvalidFrame.refCnt())
                     .describedAs("message with invalid frame should be released")
-                    .isEqualTo(0)
+                    .isEqualTo(refCntBeforeSending)
             assertThat(validMessage.refCnt())
                     .describedAs("handled message should be released")
-                    .isEqualTo(0)
+                    .isEqualTo(refCntBeforeSending)
         }
     }