private val streamBuffer: ByteBuf,
private val sink: FluxSink<WireFrame>,
private val requestedFrameCount: Long) {
+ private var completed = false
fun handleSubscriber() {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+ if (!completed) {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ completed = true
+ sink.error(ex)
}
- } catch (ex: Exception) {
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ }
}
private fun pushAvailableFrames() {
sink.next(nextFrame)
nextFrame = decodeFirstFrameFromBuffer()
}
+ completed = true
sink.complete()
}
}
}
if (remaining > 0 && nextFrame == null) {
+ completed = true
sink.complete()
}
}