Use Either instead of exceptions in frame decoder 33/58633/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 28 Jun 2018 12:42:05 +0000 (14:42 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 09:04:01 +0000 (11:04 +0200)
Goals:
* Make code cleaner (in a FP way)
* Avoid costly exception throw each time we wait for the rest of the
frame (collecting stack traces is costly and we do not need them
anyway)

Closes ONAP-437
Change-Id: I40341d3c2cb85f3ff581d89167245cb009dbb070
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

12 files changed:
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt [moved from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt with 81% similarity]
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt [moved from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt with 100% similarity]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt [moved from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt with 66% similarity]
hv-collector-domain/pom.xml
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt [moved from hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt with 58% similarity]
hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt [deleted file]
hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt

index d1d7259..502505c 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl.wire
 
+import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
+import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
+import reactor.core.publisher.SynchronousSink
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,28 +56,29 @@ internal class WireChunkDecoder(
     }
 
     private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
-        try {
-            val frame = decodeFirstFrameFromBuffer()
-            if (frame == null) {
+        decoder.decodeFirst(streamBuffer)
+                .fold(onError(next), onSuccess(next))
+                .unsafeRunSync()
+    }
+
+    private fun onError(next: SynchronousSink<WireFrame>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+        when (err) {
+            is InvalidWireFrame -> IO {
+                next.error(WireFrameException(err))
+            }
+            is MissingWireFrameBytes -> IO {
                 logEndOfData()
                 next.complete()
-            } else {
-                logDecodedWireMessage(frame)
-                next.next(frame)
             }
-        } catch (ex: Exception) {
-            next.error(ex)
         }
     }
 
-
-    private fun decodeFirstFrameFromBuffer(): WireFrame? =
-            try {
-                decoder.decodeFirst(streamBuffer)
-            } catch (ex: MissingWireFrameBytesException) {
-                logger.trace { "${ex.message} - waiting for more data" }
-                null
-            }
+    private fun onSuccess(next: SynchronousSink<WireFrame>): (WireFrame) -> IO<Unit> = { frame ->
+        IO {
+            logDecodedWireMessage(frame)
+            next.next(frame)
+        }
+    }
 
 
     private fun logIncomingMessage(wire: ByteBuf) {
@@ -90,6 +94,6 @@ internal class WireChunkDecoder(
     }
 
     companion object {
-        val logger = Logger(VesHvCollector::class)
+        val logger = Logger(WireChunkDecoder::class)
     }
 }
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-open class WireFrameDecodingException(msg: String) : Exception(msg)
+class WireFrameException(error: WireFrameDecodingError)
+    : Exception("${error::class.simpleName}: ${error.message}")
index 1ddcc3d..33f7168 100644 (file)
@@ -30,7 +30,6 @@ import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
 import reactor.test.test
 
 /**
@@ -43,11 +42,11 @@ internal object WireChunkDecoderTest : Spek({
     val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
 
     val encoder = WireFrameEncoder(alloc)
-    
+
     fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
 
     fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
-    
+
     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
         for (bb in byteBuffers) {
             assertThat(bb.refCnt())
@@ -90,7 +89,7 @@ internal object WireChunkDecoderTest : Spek({
 
             it("should yield error") {
                 createInstance().decode(input).test()
-                        .verifyError(InvalidWireFrameMarkerException::class.java)
+                        .verifyError(WireFrameException::class.java)
             }
 
             it("should leave memory unreleased") {
@@ -132,7 +131,7 @@ internal object WireChunkDecoderTest : Spek({
             it("should yield decoded input frame and error") {
                 createInstance().decode(input).test()
                         .expectNextMatches { it.payloadSize == samplePayload.size }
-                        .verifyError(InvalidWireFrameMarkerException::class.java)
+                        .verifyError(WireFrameException::class.java)
             }
 
             it("should leave memory unreleased") {
@@ -170,7 +169,7 @@ internal object WireChunkDecoderTest : Spek({
                         .expectNextMatches { it.payloadSize == samplePayload.size }
                         .verifyComplete()
                 cut.decode(input2).test()
-                        .verifyError(InvalidWireFrameMarkerException::class.java)
+                        .verifyError(WireFrameException::class.java)
             }
 
             it("should release memory for 1st input") {
index c68f051..00739fa 100644 (file)
@@ -41,12 +41,13 @@ import java.time.Duration
 import java.util.*
 import kotlin.system.measureTimeMillis
 
-
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
 object PerformanceSpecification : Spek({
+    debugRx(false)
+
     describe("VES High Volume Collector performance") {
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
@@ -69,8 +70,8 @@ object PerformanceSpecification : Spek({
 
             val durationSec = durationMs / 1000.0
             val throughput = sink.count / durationSec
-            println("Processed $runs connections each containing $numMessages msgs.")
-            println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+            logger.info("Processed $runs connections each containing $numMessages msgs.")
+            logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
             assertThat(sink.count)
                     .describedAs("should send all events")
                     .isEqualTo(runs * numMessages)
@@ -94,7 +95,7 @@ object PerformanceSpecification : Spek({
                     .timeout(timeout)
                     .block()
 
-            println("Forwarded ${sink.count} msgs")
+            logger.info("Forwarded ${sink.count} msgs")
             assertThat(sink.count)
                     .describedAs("should send up to number of events")
                     .isLessThan(numMessages)
index 0845059..49eedda 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
-import com.google.protobuf.InvalidProtocolBufferException
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -35,6 +33,8 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
  * @since May 2018
  */
 object VesHvSpecification : Spek({
+    debugRx(false)
+
     describe("VES High Volume Collector") {
         it("should handle multiple HV RAN events") {
             val sink = StoringSink()
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.tests.component
 
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.jetbrains.spek.api.dsl.SpecBody
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Hooks
 
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException(
-        "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+fun SpecBody.debugRx(debug: Boolean = true) {
+    if (debug) {
+        beforeGroup {
+            Hooks.onOperatorDebug()
+        }
+
+        afterGroup {
+            Hooks.resetOnOperatorDebug()
+        }
+    }
+}
+
+val logger = Logger("org.onap.dcae.collectors.veshv.tests.component")
index c11510a..85c2a45 100644 (file)
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-core</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.assertj</groupId>
index 3cd9b19..22767ed 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.domain
 
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,49 +50,37 @@ class WireFrameEncoder(val allocator: ByteBufAllocator) {
  */
 class WireFrameDecoder {
 
-    fun decodeFirst(byteBuf: ByteBuf): WireFrame {
-        verifyNotEmpty(byteBuf)
-        byteBuf.markReaderIndex()
-
-        verifyMarker(byteBuf)
-        verifyMinimumSize(byteBuf)
-
-        val version = byteBuf.readUnsignedByte()
-        val payloadTypeRaw = byteBuf.readUnsignedByte()
-        val payloadSize = verifyPayloadSize(byteBuf)
-        val payload = ByteData.readFrom(byteBuf, payloadSize)
+    fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> =
+            when {
+                isEmpty(byteBuf)          -> Left(EmptyWireFrame)
+                headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
+                else                      -> parseFrame(byteBuf)
+            }
 
-        return WireFrame(payload, version, payloadTypeRaw, payloadSize)
-    }
+    private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrame.HEADER_SIZE
 
-    private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
-            byteBuf.readInt().let { payloadSize ->
-                if (byteBuf.readableBytes() < payloadSize) {
-                    byteBuf.resetReaderIndex()
-                    throw MissingWireFrameBytesException("readable bytes < payload size")
-                } else {
-                    payloadSize
-                }
-            }
+    private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
 
-    private fun verifyMinimumSize(byteBuf: ByteBuf) {
-        if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) {
-            byteBuf.resetReaderIndex()
-            throw MissingWireFrameBytesException("readable bytes < header size")
-        }
-    }
+    private fun parseFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrame> {
+        byteBuf.markReaderIndex()
 
-    private fun verifyMarker(byteBuf: ByteBuf) {
         val mark = byteBuf.readUnsignedByte()
         if (mark != WireFrame.MARKER_BYTE) {
             byteBuf.resetReaderIndex()
-            throw InvalidWireFrameMarkerException(mark)
+            return Left(InvalidWireFrameMarker(mark))
         }
-    }
 
-    private fun verifyNotEmpty(byteBuf: ByteBuf) {
-        if (byteBuf.readableBytes() < 1) {
-            throw EmptyWireFrameException()
+        val version = byteBuf.readUnsignedByte()
+        val payloadTypeRaw = byteBuf.readUnsignedByte()
+
+        val payloadSize = byteBuf.readInt()
+        if (byteBuf.readableBytes() < payloadSize) {
+            byteBuf.resetReaderIndex()
+            return Left(MissingWireFramePayloadBytes)
         }
+
+        val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+        return Right(WireFrame(payload, version, payloadTypeRaw, payloadSize))
     }
 }
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.domain.exceptions
+package org.onap.dcae.collectors.veshv.domain
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)")
+
+sealed class WireFrameDecodingError(val message: String)
+
+
+// Invalid frame errors
+
+sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
+
+class InvalidWireFrameMarker(actualMarker: Short)
+    : InvalidWireFrame(
+        "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
+
+
+// Missing bytes errors
+
+sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
+
+object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
+object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
+object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
deleted file mode 100644 (file)
index 7e4b3ce..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.domain.exceptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg)
index 9694caf..a97d889 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.domain
 
+import arrow.core.Either
+import arrow.core.identity
 import io.netty.buffer.Unpooled
 import io.netty.buffer.UnpooledByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
+import org.assertj.core.api.Assertions.fail
+import org.assertj.core.api.ObjectAssert
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
 import java.nio.charset.Charset
 
 /**
@@ -119,7 +120,7 @@ object WireFrameCodecsTest : Spek({
         describe("encode-decode methods' compatibility") {
             val frame = createSampleFrame()
             val encoded = encodeSampleFrame()
-            val decoded = decoder.decodeFirst(encoded)
+            val decoded = decoder.decodeFirst(encoded).getOrFail()
 
             it("should decode version") {
                 assertThat(decoded.version).isEqualTo(frame.version)
@@ -146,7 +147,7 @@ object WireFrameCodecsTest : Spek({
                 val buff = Unpooled.buffer()
                         .writeBytes(encodeSampleFrame())
                         .writeByte(0xAA)
-                val decoded = decoder.decodeFirst(buff)
+                val decoded = decoder.decodeFirst(buff).getOrFail()
 
                 assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
                 assertThat(buff.readableBytes()).isEqualTo(1)
@@ -156,8 +157,8 @@ object WireFrameCodecsTest : Spek({
                 val buff = Unpooled.buffer()
                         .writeByte(0xFF)
 
-                assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
-                        .isThrownBy { decoder.decodeFirst(buff) }
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
+
             }
 
             it("should throw exception when first byte is not 0xFF but length looks ok") {
@@ -165,16 +166,14 @@ object WireFrameCodecsTest : Spek({
                         .writeByte(0xAA)
                         .writeBytes("some garbage".toByteArray())
 
-                assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
-                        .isThrownBy { decoder.decodeFirst(buff) }
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(InvalidWireFrameMarker::class.java) }
             }
 
             it("should throw exception when first byte is not 0xFF and length is to short") {
                 val buff = Unpooled.buffer()
                         .writeByte(0xAA)
 
-                assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
-                        .isThrownBy { decoder.decodeFirst(buff) }
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFrameHeaderBytes::class.java) }
             }
 
             it("should throw exception when payload doesn't fit") {
@@ -182,11 +181,17 @@ object WireFrameCodecsTest : Spek({
                         .writeBytes(encodeSampleFrame())
                 buff.writerIndex(buff.writerIndex() - 2)
 
-                assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
-                        .isThrownBy { decoder.decodeFirst(buff) }
+                decoder.decodeFirst(buff).assertFailedWithError { it.isInstanceOf(MissingWireFramePayloadBytes::class.java) }
             }
 
         }
     }
 
-})
\ No newline at end of file
+})
+
+private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) -> Unit) {
+    fold({ assertj(assertThat(it)) }, { fail("Error expected") })
+}
+
+private fun Either<WireFrameDecodingError, WireFrame>.getOrFail(): WireFrame =
+        fold({ fail(it.message) }, ::identity) as WireFrame