import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
+ .doOnNext {
+ if (it.isSuccessful()) {
+ Mono.just(it)
+ } else {
+ logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ Mono.empty<SenderResult<RoutedMessage>>()
+ }
+ }
.map { it.correlationMetadata() }
return if (logger.traceEnabled) {
msg)
}
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
private fun logSentMessage(sentMsg: RoutedMessage) {
logger.trace {
val msgNum = sentMessages.incrementAndGet()
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun SenderResult<out Any>.isSuccessful() = exception() == null
companion object {
val logger = Logger(KafkaSink::class)