Add metrics for dropped messages
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / VesHvCollector.kt
index ca1605e..b29432f 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Either
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
@@ -29,9 +28,15 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import reactor.core.publisher.Flux
@@ -66,7 +71,11 @@ internal class VesHvCollector(
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
-            .filterFailedWithLog(MessageValidator::validateFrameMessage)
+            .filterFailedWithLog {
+                MessageValidator
+                        .validateFrameMessage(it)
+                        .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+            }
 
     private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
             .map(WireFrameMessage::payload)
@@ -74,12 +83,17 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
+            .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
             .filterFailedWithLog(logger, clientContext::fullMdc,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
     private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
-            .filterFailedWithLog(MessageValidator::validateProtobufMessage)
+            .filterFailedWithLog {
+                MessageValidator
+                        .validateProtobufMessage(it)
+                        .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+            }
 
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
@@ -88,6 +102,7 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
+            .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
             .filterEmptyWithLog(logger, clientContext::fullMdc,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
@@ -95,7 +110,7 @@ internal class VesHvCollector(
     private fun releaseBuffersMemory() = wireChunkDecoder.release()
             .also { logger.debug { "Released buffer memory after handling message stream" } }
 
-    fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+    private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
             filterFailedWithLog(logger, clientContext::fullMdc, predicate)
 
     companion object {