Use generator to simplify the WireFrame decoding 27/58627/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 28 Jun 2018 08:09:24 +0000 (10:09 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 08:51:38 +0000 (10:51 +0200)
Performance tests have proven that manual creation of the Flux doesn't
give us any performance benefits. On the other side it is complicated
and error prone.

Closes ONAP-438

Change-Id: I45912f91a52cbc84322775f7bae6d73afda079b8
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt [deleted file]
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt [deleted file]

diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
deleted file mode 100644 (file)
index b788f51..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.CompositeByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import reactor.core.publisher.FluxSink
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.function.Consumer
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class StreamBufferEmitter(
-        private val decoder: WireFrameDecoder,
-        private val streamBuffer: CompositeByteBuf,
-        private val newFrame: ByteBuf)
-    : Consumer<FluxSink<WireFrame>> {
-
-    private val subscribed = AtomicBoolean(false)
-
-    override fun accept(sink: FluxSink<WireFrame>) {
-        when {
-
-            subscribed.getAndSet(true) ->
-                sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
-
-            newFrame.readableBytes() == 0 -> {
-                logger.trace { "Discarding empty buffer" }
-                newFrame.release()
-                sink.complete()
-            }
-
-            else -> {
-                streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
-                sink.onDispose {
-                    logger.trace { "Disposing read components" }
-                    streamBuffer.discardReadComponents()
-                }
-                sink.onRequest { requestedFrameCount ->
-                    WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber()
-                }
-            }
-        }
-    }
-
-    companion object {
-        fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
-                Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame))
-
-        private const val INCREASE_WRITER_INDEX = true
-        private val logger = Logger(StreamBufferEmitter::class)
-    }
-}
index cfb61b3..d1d7259 100644 (file)
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.domain.WireFrame
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
@@ -35,46 +36,47 @@ internal class WireChunkDecoder(
         private val decoder: WireFrameDecoder,
         alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
     private val streamBuffer = alloc.compositeBuffer()
-    
-//  TODO: use this implementation and cleanup the rest
-//    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
-//        if (byteBuf.readableBytes() == 0) {
-//            byteBuf.release()
-//            Flux.empty()
-//        } else {
-//            streamBuffer.addComponent(true, byteBuf)
-//            Flux.generate { next ->
-//                try {
-//                    val frame = decodeFirstFrameFromBuffer()
-//                    if (frame == null)
-//                        next.complete()
-//                    else
-//                        next.next(frame)
-//                } catch (ex: Exception) {
-//                    next.error(ex)
-//                }
-//            }
-//        }
-//    }.doOnTerminate { streamBuffer.discardReadComponents() }
-//
-//
-//    private fun decodeFirstFrameFromBuffer(): WireFrame? =
-//            try {
-//                decoder.decodeFirst(streamBuffer)
-//            } catch (ex: MissingWireFrameBytesException) {
-//                logger.trace { "${ex.message} - waiting for more data" }
-//                null
-//            }
-
-    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
-            .createFlux(decoder, streamBuffer, byteBuf)
-            .doOnSubscribe { logIncomingMessage(byteBuf) }
-            .doOnNext(this::logDecodedWireMessage)
 
     fun release() {
         streamBuffer.release()
     }
 
+    fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer {
+        logIncomingMessage(byteBuf)
+        if (byteBuf.readableBytes() == 0) {
+            byteBuf.release()
+            Flux.empty()
+        } else {
+            streamBuffer.addComponent(true, byteBuf)
+            generateFrames().doOnTerminate { streamBuffer.discardReadComponents() }
+        }
+    }
+
+    private fun generateFrames(): Flux<WireFrame> = Flux.generate { next ->
+        try {
+            val frame = decodeFirstFrameFromBuffer()
+            if (frame == null) {
+                logEndOfData()
+                next.complete()
+            } else {
+                logDecodedWireMessage(frame)
+                next.next(frame)
+            }
+        } catch (ex: Exception) {
+            next.error(ex)
+        }
+    }
+
+
+    private fun decodeFirstFrameFromBuffer(): WireFrame? =
+            try {
+                decoder.decodeFirst(streamBuffer)
+            } catch (ex: MissingWireFrameBytesException) {
+                logger.trace { "${ex.message} - waiting for more data" }
+                null
+            }
+
+
     private fun logIncomingMessage(wire: ByteBuf) {
         logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
     }
@@ -83,6 +85,10 @@ internal class WireChunkDecoder(
         logger.trace { "Wire payload size: ${wire.payloadSize} B." }
     }
 
+    private fun logEndOfData() {
+        logger.trace { "End of data in current TCP buffer" }
+    }
+
     companion object {
         val logger = Logger(VesHvCollector::class)
     }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
deleted file mode 100644 (file)
index 540c647..0000000
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.FluxSink
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class WireFrameSink(
-        private val decoder: WireFrameDecoder,
-        private val streamBuffer: ByteBuf,
-        private val sink: FluxSink<WireFrame>,
-        private val requestedFrameCount: Long) {
-    private var completed = false
-
-    fun handleSubscriber() {
-        if (!completed) {
-            logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
-
-            try {
-                if (requestedFrameCount == Long.MAX_VALUE) {
-                    logger.trace { "Push based strategy" }
-                    pushAvailableFrames()
-                } else {
-                    logger.trace { "Pull based strategy - req $requestedFrameCount" }
-                    pushUpToNumberOfFrames()
-                }
-            } catch (ex: Exception) {
-                completed = true
-                sink.error(ex)
-            }
-
-            logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
-        }
-    }
-
-    private fun pushAvailableFrames() {
-        var nextFrame = decodeFirstFrameFromBuffer()
-        while (nextFrame != null && !sink.isCancelled) {
-            sink.next(nextFrame)
-            nextFrame = decodeFirstFrameFromBuffer()
-        }
-        completed = true
-        sink.complete()
-    }
-
-    private fun pushUpToNumberOfFrames() {
-        var nextFrame = decodeFirstFrameFromBuffer()
-        var remaining = requestedFrameCount
-        loop@ while (nextFrame != null && !sink.isCancelled) {
-            sink.next(nextFrame)
-            if (--remaining > 0) {
-                nextFrame = decodeFirstFrameFromBuffer()
-            } else {
-                break@loop
-            }
-        }
-        if (remaining > 0 && nextFrame == null) {
-            completed = true
-            sink.complete()
-        }
-    }
-
-    private fun decodeFirstFrameFromBuffer(): WireFrame? =
-            try {
-                decoder.decodeFirst(streamBuffer)
-            } catch (ex: MissingWireFrameBytesException) {
-                logger.trace { "${ex.message} - waiting for more data" }
-                null
-            }
-
-    companion object {
-        private val logger = Logger(WireFrameSink::class)
-    }
-}