Use Flux.transform in VesHvCollector 09/58709/1
authorJakub Dudycz <jdudycz@nokia.com>
Tue, 10 Jul 2018 10:29:32 +0000 (12:29 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 12:03:40 +0000 (14:03 +0200)
Goal: split the stream into logical parts

Closes ONAP-493
Change-Id: I87aa817a18674fad265df81b6a0b4a8f0c46b866
Signed-off-by: Jakub Dudycz <jdudycz@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt

index 06047fd..1bde6a1 100644 (file)
@@ -57,7 +57,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
         return VesHvCollector(
                 wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
                 protobufDecoder = VesDecoder(),
-                validator = MessageValidator(),
+                messageValidator = MessageValidator(),
                 router = Router(config.routing),
                 sink = sinkProvider(config),
                 metrics = metrics)
index ceae78c..511ccf3 100644 (file)
@@ -25,8 +25,8 @@ import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -45,7 +45,7 @@ import java.util.function.BiConsumer
 internal class VesHvCollector(
         private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
-        private val validator: MessageValidator,
+        private val messageValidator: MessageValidator,
         private val router: Router,
         private val sink: Sink,
         private val metrics: Metrics) : Collector {
@@ -53,22 +53,32 @@ internal class VesHvCollector(
     override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
-                        .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
-                        .concatMap(wireDecoder::decode)
-                        .handle(completeStreamOnEOT)
-                        .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+                        .transform { decodeWireFrame(it, wireDecoder) }
                         .filter(PayloadWireFrameMessage::isValid)
-                        .map(PayloadWireFrameMessage::payload)
-                        .map(protobufDecoder::decode)
-                        .filter(validator::isValid)
-                        .flatMap(this::findRoute)
-                        .compose(sink::send)
-                        .doOnNext { metrics.notifyMessageSent(it.topic) }
+                        .transform(::decodePayload)
+                        .filter(messageValidator::isValid)
+                        .transform(::routeMessage)
                         .doOnTerminate { releaseBuffersMemory(wireDecoder) }
-                        .onErrorResume(this::handleErrors)
+                        .onErrorResume(::handleErrors)
                         .then()
             }
 
+    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+            .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
+            .concatMap(decoder::decode)
+            .handle(completeStreamOnEOT)
+            .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+
+    private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
+            .map(PayloadWireFrameMessage::payload)
+            .map(protobufDecoder::decode)
+
+
+    private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+            .flatMap(this::findRoute)
+            .compose(sink::send)
+            .doOnNext { metrics.notifyMessageSent(it.topic) }
+
     private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
 
     private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
@@ -76,14 +86,14 @@ internal class VesHvCollector(
                     { Mono.empty() },
                     { Mono.just(it) })
 
+    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+
     private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
         logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
         logger.debug("Detailed stack trace", ex)
         return Flux.empty()
     }
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-
     companion object {
         private val logger = Logger(VesHvCollector::class)