Write performance tests
[dcaegen2/collectors/hv-ves.git] / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / wire / WireFrameSink.kt
index abebff3..540c647 100644 (file)
@@ -35,24 +35,27 @@ internal class WireFrameSink(
         private val streamBuffer: ByteBuf,
         private val sink: FluxSink<WireFrame>,
         private val requestedFrameCount: Long) {
+    private var completed = false
 
     fun handleSubscriber() {
-        logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+        if (!completed) {
+            logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
 
-        try {
-            if (requestedFrameCount == Long.MAX_VALUE) {
-                logger.trace { "Push based strategy" }
-                pushAvailableFrames()
-            } else {
-                logger.trace { "Pull based strategy - req $requestedFrameCount" }
-                pushUpToNumberOfFrames()
+            try {
+                if (requestedFrameCount == Long.MAX_VALUE) {
+                    logger.trace { "Push based strategy" }
+                    pushAvailableFrames()
+                } else {
+                    logger.trace { "Pull based strategy - req $requestedFrameCount" }
+                    pushUpToNumberOfFrames()
+                }
+            } catch (ex: Exception) {
+                completed = true
+                sink.error(ex)
             }
-        } catch (ex: Exception) {
-            sink.error(ex)
-        }
-
-        logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
 
+            logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+        }
     }
 
     private fun pushAvailableFrames() {
@@ -61,6 +64,7 @@ internal class WireFrameSink(
             sink.next(nextFrame)
             nextFrame = decodeFirstFrameFromBuffer()
         }
+        completed = true
         sink.complete()
     }
 
@@ -76,6 +80,7 @@ internal class WireFrameSink(
             }
         }
         if (remaining > 0 && nextFrame == null) {
+            completed = true
             sink.complete()
         }
     }