Add Message tracing logger service.
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaBasicAuthMessageConsumerService.kt
index b5d444a..a4ccfa9 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -24,30 +25,38 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.nio.charset.Charset
 import java.time.Duration
 import kotlin.concurrent.thread
 
-class KafkaBasicAuthMessageConsumerService(
+open class KafkaBasicAuthMessageConsumerService(
         private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
     : BlueprintMessageConsumerService {
 
-    private val channel = Channel<String>()
-    private var kafkaConsumer: Consumer<String, String>? = null
     val log = logger(KafkaBasicAuthMessageConsumerService::class)
 
+    val channel = Channel<String>()
+    var kafkaConsumer: Consumer<String, ByteArray>? = null
+
     @Volatile
     var keepGoing = true
 
-    fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+    fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
         val configProperties = hashMapOf<String, Any>()
         configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
         configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
-        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+        configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
+        /**
+         * earliest: automatically reset the offset to the earliest offset
+         * latest: automatically reset the offset to the latest offset
+         */
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
-        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
         if (messageConsumerProperties.clientId != null) {
             configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
         }
@@ -95,7 +104,7 @@ class KafkaBasicAuthMessageConsumerService(
                             consumerRecord.value()?.let {
                                 launch {
                                     if (!channel.isClosedForSend) {
-                                        channel.send(it)
+                                        channel.send(String(it, Charset.defaultCharset()))
                                     } else {
                                         log.error("Channel is closed to receive message")
                                     }