import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
import reactor.core.publisher.Flux
+import reactor.core.publisher.Flux.defer
import reactor.core.publisher.SynchronousSink
/**
streamBuffer.release()
}
- fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer {
logIncomingMessage(byteBuf)
if (byteBuf.readableBytes() == 0) {
byteBuf.release()
} else {
streamBuffer.addComponent(true, byteBuf)
generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(ctx::asMap, it, Flux.error(it)) }
+ .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) }
.doFinally { streamBuffer.discardReadComponents() }
}
}