package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.model.ClientContextLogging.withWarn
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>, private val ctx: ClientContext) : Sink {
private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
if (it.isSuccessful()) {
Mono.just(it)
} else {
- logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
Mono.empty<SenderResult<RoutedMessage>>()
}
}
.map { it.correlationMetadata() }
- return if (logger.traceEnabled) {
- result.doOnNext(::logSentMessage)
- } else {
- result
- }
+ return result.doOnNext(::logSentMessage)
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
}
private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace {
+ logger.trace(ctx) {
val msgNum = sentMessages.incrementAndGet()
"Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
}