Harmonize logging and add new logs
[dcaegen2/collectors/hv-ves.git] / sources / hv-collector-core / src / main / kotlin / org / onap / dcae / collectors / veshv / impl / adapters / kafka / KafkaSink.kt
index a0c2241..c4d6c87 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
 import reactor.kafka.sender.SenderResult
@@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
         val records = messages.map(this::vesToKafkaRecord)
         val result = sender.send(records)
-                .doOnNext(::logException)
-                .filter(::isSuccessful)
+                .doOnNext {
+                    if (it.isSuccessful()) {
+                        Mono.just(it)
+                    } else {
+                        logger.warn(it.exception()) { "Failed to send message to Kafka" }
+                        Mono.empty<SenderResult<RoutedMessage>>()
+                    }
+                }
                 .map { it.correlationMetadata() }
 
         return if (logger.traceEnabled) {
@@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
                 msg)
     }
 
-    private fun logException(senderResult: SenderResult<out Any>) {
-        if (senderResult.exception() != null) {
-            logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
-        }
-    }
-
     private fun logSentMessage(sentMsg: RoutedMessage) {
         logger.trace {
             val msgNum = sentMessages.incrementAndGet()
@@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
         }
     }
 
-    private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+    private fun SenderResult<out Any>.isSuccessful() = exception() == null
 
     companion object {
         val logger = Logger(KafkaSink::class)