/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.producer.Callback
+import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.SendResult
-import org.springframework.util.concurrent.ListenableFutureCallback
+import java.nio.charset.Charset
class KafkaBasicAuthMessageProducerService(
private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
- private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+ private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
+
+ private val messageLoggerService = MessageLoggerService()
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessage(messageProducerProperties.topic!!, message)
+ return sendMessageNB(messageProducerProperties.topic!!, message)
}
- override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
- val serializedMessage = when (message) {
- is String -> {
- message
- }
- else -> {
- message.asJsonType().toString()
- }
- }
- val future = messageTemplate().send(topic, serializedMessage)
+ override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
+ checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+ return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+ }
- future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
- override fun onSuccess(result: SendResult<String, Any>) {
- log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
- }
+ override suspend fun sendMessageNB(topic: String, message: Any,
+ headers: MutableMap<String, String>?): Boolean {
+ val byteArrayMessage = when (message) {
+ is String -> message.toByteArray(Charset.defaultCharset())
+ else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+ }
- override fun onFailure(ex: Throwable) {
- log.error("Unable to send message", ex)
- }
- })
+ val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val recordHeaders = record.headers()
+ messageLoggerService.messageProducing(recordHeaders)
+ headers?.let {
+ headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
+ }
+ val callback = Callback { metadata, exception ->
+ log.info("message published offset(${metadata.offset()}, headers :$headers )")
+ }
+ messageTemplate().send(record, callback).get()
return true
}
- private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
- log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+ fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
+ log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
val configProps = hashMapOf<String, Any>()
configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
if (messageProducerProperties.clientId != null) {
configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
}
if (additionalConfig != null) {
configProps.putAll(additionalConfig)
}
- return DefaultKafkaProducerFactory(configProps)
- }
- fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
- log.info("Prepering templates")
- if (kafkaTemplate == null) {
- kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+ if (kafkaProducer == null) {
+ kafkaProducer = KafkaProducer(configProps)
}
- return kafkaTemplate!!
+ return kafkaProducer!!
}
}