/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+ private val messageLoggerService = MessageLoggerService()
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
headers?.let {
- headers.forEach { (key, value) -> record.headers().add(RecordHeader(key, value.toByteArray())) }
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
}
val callback = Callback { metadata, exception ->
log.info("message published offset(${metadata.offset()}, headers :$headers )")