import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
internal class VesHvCollector(
private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
private val protobufDecoder: VesDecoder,
- private val validator: MessageValidator,
+ private val messageValidator: MessageValidator,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
- .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(wireDecoder::decode)
- .handle(completeStreamOnEOT)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+ .transform { decodeWireFrame(it, wireDecoder) }
.filter(PayloadWireFrameMessage::isValid)
- .map(PayloadWireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .filter(validator::isValid)
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
+ .transform(::decodePayload)
+ .filter(messageValidator::isValid)
+ .transform(::routeMessage)
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
- .onErrorResume(this::handleErrors)
+ .onErrorResume(::handleErrors)
.then()
}
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
+ .concatMap(decoder::decode)
+ .handle(completeStreamOnEOT)
+ .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+
+ private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
+ .map(PayloadWireFrameMessage::payload)
+ .map(protobufDecoder::decode)
+
+
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext { metrics.notifyMessageSent(it.topic) }
+
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
{ Mono.empty() },
{ Mono.just(it) })
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+
private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
logger.debug("Detailed stack trace", ex)
return Flux.empty()
}
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-
companion object {
private val logger = Logger(VesHvCollector::class)