/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
-class KafkaBasicAuthMessageConsumerService(
+open class KafkaBasicAuthMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
: BlueprintMessageConsumerService {
- private val channel = Channel<String>()
- private var kafkaConsumer: Consumer<String, String>? = null
val log = logger(KafkaBasicAuthMessageConsumerService::class)
+ val channel = Channel<String>()
+ var kafkaConsumer: Consumer<String, ByteArray>? = null
+
@Volatile
var keepGoing = true
- fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+ fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
val configProperties = hashMapOf<String, Any>()
configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+ configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
+ /**
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ */
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
consumerRecord.value()?.let {
launch {
if (!channel.isClosedForSend) {
- channel.send(it)
+ channel.send(String(it, Charset.defaultCharset()))
} else {
log.error("Channel is closed to receive message")
}