Add message consumer dynamic functions. 81/97681/1
authorBrinda Santh <bs2796@att.com>
Tue, 29 Oct 2019 14:47:00 +0000 (10:47 -0400)
committerBrinda Santh <bs2796@att.com>
Tue, 29 Oct 2019 14:47:00 +0000 (10:47 -0400)
Issue-ID: CCSDK-1668
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I51be88598557a05ef5db7cd595689a8e4a653fdc

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml

index 0b899f2..184e85b 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -33,7 +34,7 @@ open class MessageConsumerProperties
 open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
     lateinit var bootstrapServers: String
     lateinit var groupId: String
-    var clientId: String? = null
+    lateinit var clientId: String
     var topic: String? = null
     var autoCommit: Boolean = true
     var autoOffsetReset: String = "latest"
index 25f0bf4..8bcc758 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+/** Consumer Function Interfaces */
+interface ConsumerFunction
 
 interface BlueprintMessageConsumerService {
 
+    suspend fun subscribe(): Channel<String> {
+        return subscribe(null)
+    }
+
     /** Subscribe to the Kafka channel with [additionalConfig] */
     suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
 
     /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
     suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
 
+    /** Consume and execute dynamic function [consumerFunction] */
+    suspend fun consume(consumerFunction: ConsumerFunction) {
+        consume(null, consumerFunction)
+    }
+
+    /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+    suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+        throw BluePrintProcessorException("Not Implemented")
+    }
+
+    /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+    suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+                        consumerFunction: ConsumerFunction) {
+        throw BluePrintProcessorException("Not Implemented")
+    }
+
     /** close the channel, consumer and other resources */
     suspend fun shutDown()
-
+}
+/** Consumer dynamic implementation interface */
+interface KafkaConsumerRecordsFunction : ConsumerFunction {
+    suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
+                       consumerRecords: ConsumerRecords<*, *>)
 }
\ No newline at end of file
index a4ccfa9..757846c 100644 (file)
@@ -38,7 +38,6 @@ open class KafkaBasicAuthMessageConsumerService(
     : BlueprintMessageConsumerService {
 
     val log = logger(KafkaBasicAuthMessageConsumerService::class)
-
     val channel = Channel<String>()
     var kafkaConsumer: Consumer<String, ByteArray>? = null
 
@@ -57,9 +56,8 @@ open class KafkaBasicAuthMessageConsumerService(
         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
-        if (messageConsumerProperties.clientId != null) {
-            configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
-        }
+        configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
+
         /** To handle Back pressure, Get only configured record for processing */
         if (messageConsumerProperties.pollRecords > 0) {
             configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
@@ -79,7 +77,7 @@ open class KafkaBasicAuthMessageConsumerService(
     }
 
 
-    override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
         /** Create Kafka consumer */
         kafkaConsumer = kafkaConsumer(additionalConfig)
 
@@ -89,15 +87,15 @@ open class KafkaBasicAuthMessageConsumerService(
                     "topics(${messageConsumerProperties.bootstrapServers})"
         }
 
-        kafkaConsumer!!.subscribe(consumerTopic)
-        log.info("Successfully consumed topic($consumerTopic)")
+        kafkaConsumer!!.subscribe(topics)
+        log.info("Successfully consumed topic($topics)")
 
-        thread(start = true, name = "KafkaConsumer") {
+        thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
             keepGoing = true
             kafkaConsumer!!.use { kc ->
                 while (keepGoing) {
                     val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
-                    log.info("Consumed Records : ${consumerRecords.count()}")
+                    log.trace("Consumed Records : ${consumerRecords.count()}")
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             /** execute the command block */
@@ -119,6 +117,46 @@ open class KafkaBasicAuthMessageConsumerService(
         return channel
     }
 
+    override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+        /** get to topic names */
+        val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
+        check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
+        return consume(topics = consumerTopic, additionalConfig = additionalConfig, consumerFunction = consumerFunction)
+    }
+
+    override suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
+                                 consumerFunction: ConsumerFunction) {
+
+        val kafkaConsumerFunction = consumerFunction as KafkaConsumerRecordsFunction
+
+        /** Create Kafka consumer */
+        kafkaConsumer = kafkaConsumer(additionalConfig)
+
+        checkNotNull(kafkaConsumer) {
+            "failed to create kafka consumer for " +
+                    "server(${messageConsumerProperties.bootstrapServers})'s " +
+                    "topics(${messageConsumerProperties.bootstrapServers})"
+        }
+
+        kafkaConsumer!!.subscribe(topics)
+        log.info("Successfully consumed topic($topics)")
+
+        thread(start = true, name = "KafkaConsumer-${messageConsumerProperties.clientId}") {
+            keepGoing = true
+            kafkaConsumer!!.use { kc ->
+                while (keepGoing) {
+                    val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+                    log.trace("Consumed Records : ${consumerRecords.count()}")
+                    runBlocking {
+                        /** Execute dynamic consumer Block substitution */
+                        kafkaConsumerFunction.invoke(messageConsumerProperties, kc, consumerRecords)
+                    }
+                }
+                log.info("message listener shutting down.....")
+            }
+        }
+    }
+
     override suspend fun shutDown() {
         /** stop the polling loop */
         keepGoing = false
index 86c2ec5..bdceec7 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -22,15 +23,14 @@ import kotlinx.coroutines.channels.consumeEach
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.clients.consumer.MockConsumer
-import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.consumer.*
 import org.apache.kafka.common.TopicPartition
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.annotation.DirtiesContext
@@ -110,6 +110,54 @@ open class BlueprintMessageConsumerServiceTest {
         }
     }
 
+    @Test
+    fun testKafkaBasicAuthConsumerWithDynamicFunction() {
+        runBlocking {
+            val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+            assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+            val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+            val topic = "default-topic"
+            val partitions: MutableList<TopicPartition> = arrayListOf()
+            val topicsCollection: MutableList<String> = arrayListOf()
+            partitions.add(TopicPartition(topic, 1))
+            val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+            val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+            val records: Long = 10
+            partitions.forEach { partition ->
+                partitionsBeginningMap[partition] = 0L
+                partitionsEndMap[partition] = records
+                topicsCollection.add(partition.topic())
+            }
+            val mockKafkaConsumer = MockConsumer<String, ByteArray>(OffsetResetStrategy.EARLIEST)
+            mockKafkaConsumer.subscribe(topicsCollection)
+            mockKafkaConsumer.rebalance(partitions)
+            mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+            mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+            for (i in 1..10) {
+                val record = ConsumerRecord<String, ByteArray>(topic, 1, i.toLong(), "key_$i",
+                        "I am message $i".toByteArray())
+                mockKafkaConsumer.addRecord(record)
+            }
+
+            every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+            /** Test Consumer Function implementation */
+            val consumerFunction = object : KafkaConsumerRecordsFunction {
+                override suspend fun invoke(messageConsumerProperties: MessageConsumerProperties,
+                                            consumer: Consumer<*, *>, consumerRecords: ConsumerRecords<*, *>) {
+                    val count = consumerRecords.count()
+                    log.trace("Received Message count($count)")
+                }
+            }
+            spyBlueprintMessageConsumerService.consume(consumerFunction)
+            delay(10)
+            spyBlueprintMessageConsumerService.shutDown()
+        }
+    }
+
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     //@Test
     fun testKafkaIntegration() {
index 5f4fb4f..820041f 100644 (file)
@@ -27,7 +27,7 @@
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>${testing}</pattern>
+            <pattern>${localPattern}</pattern>
         </encoder>
     </appender>