Harmonize logging and add new logs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / VesHvCollector.kt
index 2f12e0c..4176de9 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Option
+import arrow.core.Either
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -49,9 +52,9 @@ internal class VesHvCollector(
             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
                         .transform { decodeWireFrame(it, wireDecoder) }
-                        .filter(WireFrameMessage::isValid)
-                        .transform(::decodePayload)
-                        .filter(VesMessage::isValid)
+                        .transform(::filterInvalidWireFrame)
+                        .transform(::decodeProtobufPayload)
+                        .transform(::filterInvalidProtobufMessages)
                         .transform(::routeMessage)
                         .onErrorResume { logger.handleReactiveStreamError(it) }
                         .doFinally { releaseBuffersMemory(wireDecoder) }
@@ -63,26 +66,38 @@ internal class VesHvCollector(
             .concatMap(decoder::decode)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
-    private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+    private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
+            .filterFailedWithLog(MessageValidator::validateFrameMessage)
+
+    private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
             .map(WireFrameMessage::payload)
-            .map(protobufDecoder::decode)
-            .flatMap { omitWhenNone(it) }
+            .flatMap(::decodePayload)
+
+    private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
+            .decode(rawPayload)
+            .filterFailedWithLog(logger,
+                    { "Ves event header decoded successfully" },
+                    { "Failed to decode ves event header, reason: ${it.message}" })
+
+    private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
+            .filterFailedWithLog(MessageValidator::validateProtobufMessage)
 
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
             .compose(sink::send)
             .doOnNext { metrics.notifyMessageSent(it.topic) }
 
-
-    private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
-    private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
-            {
-                logger.info("ommiting the message" + 5)
-                Mono.empty() },
-            { Mono.just(it) })
+    private fun findRoute(msg: VesMessage) = router
+            .findDestination(msg)
+            .filterEmptyWithLog(logger,
+                    { "Found route for message: ${it.topic}, partition: ${it.partition}" },
+                    { "Could not find route for message" })
 
     private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+            .also { logger.debug("Released buffer memory after handling message stream") }
+
+    fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+            filterFailedWithLog(logger, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)