import arrow.effects.IO
import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
*/
internal class WireChunkDecoder(
private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- private val streamBuffer = alloc.compositeBuffer()
+ private val ctx: ClientContext) {
+ private val streamBuffer = ctx.alloc.compositeBuffer()
fun release() {
streamBuffer.release()
} else {
streamBuffer.addComponent(true, byteBuf)
generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
.doFinally { streamBuffer.discardReadComponents() }
}
}
}
private fun logIncomingMessage(wire: ByteBuf) {
- logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
}
private fun logDecodedWireMessage(wire: WireFrameMessage) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
}
private fun logEndOfData() {
- logger.trace { "End of data in current TCP buffer" }
+ logger.trace(ctx) { "End of data in current TCP buffer" }
}
companion object {