Merge "Add Kafka message lib consumer services"
authorDan Timoney <dtimoney@att.com>
Thu, 5 Sep 2019 18:54:33 +0000 (18:54 +0000)
committerGerrit Code Review <gerrit@onap.org>
Thu, 5 Sep 2019 18:54:33 +0000 (18:54 +0000)
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt

index 644c518..281a970 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.message
 
 
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.ComponentScan
 import org.springframework.context.annotation.Configuration
@@ -26,10 +31,37 @@ import org.springframework.context.annotation.Configuration
 @EnableConfigurationProperties
 open class BluePrintMessageLibConfiguration
 
+/**
+ * Exposed Dependency Service by this Message Lib Module
+ */
+fun BluePrintDependencyService.messageLibPropertyService(): BluePrintMessageLibPropertyService =
+        instance(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
+
+/** Extension functions for message producer service **/
+fun BluePrintDependencyService.messageProducerService(selector: String): BlueprintMessageProducerService {
+    return messageLibPropertyService().blueprintMessageProducerService(selector)
+}
+
+
+fun BluePrintDependencyService.messageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
+    return messageLibPropertyService().blueprintMessageProducerService(jsonNode)
+}
+
+/** Extension functions for message consumer service **/
+fun BluePrintDependencyService.messageConsumerService(selector: String): BlueprintMessageConsumerService {
+    return messageLibPropertyService().blueprintMessageConsumerService(selector)
+}
+
+fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
+    return messageLibPropertyService().blueprintMessageConsumerService(jsonNode)
+}
+
 class MessageLibConstants {
     companion object {
         const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
-        const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient."
+        //TODO("Change to .messageconsumer in application.properties")
+        const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+        const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
         const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
     }
 }
\ No newline at end of file
index e621ec6..c77cdfd 100644 (file)
@@ -16,7 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message
 
-
+/** Producer Properties **/
 open class MessageProducerProperties
 
 
@@ -24,4 +24,17 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
     lateinit var bootstrapServers: String
     var topic: String? = null
     var clientId: String? = null
-}
\ No newline at end of file
+}
+
+/** Consumer Properties **/
+
+open class MessageConsumerProperties
+
+open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
+    lateinit var bootstrapServers: String
+    lateinit var groupId: String
+    var consumerTopic: String? = null
+    var pollMillSec: Long = 100
+}
+
+open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
index fb01ce1..7c56ea4 100644 (file)
@@ -18,9 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import com.fasterxml.jackson.databind.JsonNode
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.*
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import org.springframework.stereotype.Service
@@ -28,22 +26,22 @@ import org.springframework.stereotype.Service
 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
 open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
 
-    fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService {
-        val messageClientProperties = messageClientProperties(jsonNode)
-        return blueprintMessageClientService(messageClientProperties)
+    fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
+        val messageClientProperties = messageProducerProperties(jsonNode)
+        return blueprintMessageProducerService(messageClientProperties)
     }
 
-    fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService {
-        val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector"
-        val messageClientProperties = messageClientProperties(prefix)
-        return blueprintMessageClientService(messageClientProperties)
+    fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
+        val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
+        val messageClientProperties = messageProducerProperties(prefix)
+        return blueprintMessageProducerService(messageClientProperties)
     }
 
-    fun messageClientProperties(prefix: String): MessageProducerProperties {
+    fun messageProducerProperties(prefix: String): MessageProducerProperties {
         val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
-                kafkaBasicAuthMessageClientProperties(prefix)
+                kafkaBasicAuthMessageProducerProperties(prefix)
             }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -51,7 +49,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
         }
     }
 
-    fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties {
+    fun messageProducerProperties(jsonNode: JsonNode): MessageProducerProperties {
         val type = jsonNode.get("type").textValue()
         return when (type) {
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
@@ -63,7 +61,7 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
         }
     }
 
-    private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties)
+    private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties)
             : BlueprintMessageProducerService {
 
         when (MessageProducerProperties) {
@@ -76,9 +74,67 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
         }
     }
 
-    private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
+    private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
         return bluePrintProperties.propertyBeanType(
                 prefix, KafkaBasicAuthMessageProducerProperties::class.java)
     }
 
+    /** Consumer Property Lib Service Implementation **/
+
+    /** Return Message Consumer Service for [jsonNode] definitions. */
+    fun blueprintMessageConsumerService(jsonNode: JsonNode): BlueprintMessageConsumerService {
+        val messageConsumerProperties = messageConsumerProperties(jsonNode)
+        return blueprintMessageConsumerService(messageConsumerProperties)
+    }
+
+    /** Return Message Consumer Service for [selector] definitions. */
+    fun blueprintMessageConsumerService(selector: String): BlueprintMessageConsumerService {
+        val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}$selector"
+        val messageClientProperties = messageConsumerProperties(prefix)
+        return blueprintMessageConsumerService(messageClientProperties)
+    }
+
+    /** Return Message Consumer Properties for [prefix] definitions. */
+    fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
+        val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+        return when (type) {
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                kafkaBasicAuthMessageConsumerProperties(prefix)
+            }
+            else -> {
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            }
+        }
+    }
+
+    fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
+        val type = jsonNode.get("type").textValue()
+        return when (type) {
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
+            }
+            else -> {
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            }
+        }
+    }
+
+    private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties)
+            : BlueprintMessageConsumerService {
+
+        when (messageConsumerProperties) {
+            is KafkaBasicAuthMessageConsumerProperties -> {
+                return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+            }
+            else -> {
+                throw BluePrintProcessorException("couldn't get Message client service for")
+            }
+        }
+    }
+
+    private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
+        return bluePrintProperties.propertyBeanType(
+                prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
+    }
+
 }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
new file mode 100644 (file)
index 0000000..25f0bf4
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+
+interface BlueprintMessageConsumerService {
+
+    /** Subscribe to the Kafka channel with [additionalConfig] */
+    suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+
+    /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
+    suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+
+    /** close the channel, consumer and other resources */
+    suspend fun shutDown()
+
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
new file mode 100644 (file)
index 0000000..076501e
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.time.Duration
+import kotlin.concurrent.thread
+
+class KafkaBasicAuthMessageConsumerService(
+        private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties)
+    : BlueprintMessageConsumerService {
+
+    private val channel = Channel<String>()
+    private var kafkaConsumer: Consumer<String, String>? = null
+    val log = logger(KafkaBasicAuthMessageConsumerService::class)
+
+    @Volatile
+    var keepGoing = true
+
+    fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, String> {
+        val configProperties = hashMapOf<String, Any>()
+        configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
+        configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
+        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        /** add or override already set properties */
+        additionalConfig?.let { configProperties.putAll(it) }
+        /** Create Kafka consumer */
+        return KafkaConsumer(configProperties)
+    }
+
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+        /** get to topic names */
+        val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+        check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
+        return subscribe(consumerTopic, additionalConfig)
+    }
+
+
+    override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+        /** Create Kafka consumer */
+        kafkaConsumer = kafkaConsumer(additionalConfig)
+        checkNotNull(kafkaConsumer) {
+            "failed to create kafka consumer for " +
+                    "server(${messageConsumerProperties.bootstrapServers})'s " +
+                    "topics(${messageConsumerProperties.bootstrapServers})"
+        }
+
+        kafkaConsumer!!.subscribe(consumerTopic)
+        log.info("Successfully consumed topic($consumerTopic)")
+
+        val listenerThread = thread(start = true, name = "KafkaConsumer") {
+            keepGoing = true
+            kafkaConsumer!!.use { kc ->
+                while (keepGoing) {
+                    val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+                    runBlocking {
+                        consumerRecords?.forEach { consumerRecord ->
+                            /** execute the command block */
+                            consumerRecord.value()?.let {
+                                launch {
+                                    if (!channel.isClosedForSend) {
+                                        channel.send(it)
+                                    } else {
+                                        log.error("Channel is closed to receive message")
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+
+        }
+        log.info("Successfully consumed in thread(${listenerThread})")
+        return channel
+    }
+
+    override suspend fun shutDown() {
+        /** Close the Channel */
+        channel.close()
+        /** stop the polling loop */
+        keepGoing = false
+        if (kafkaConsumer != null) {
+            /** sunsubscribe the consumer */
+            kafkaConsumer!!.unsubscribe()
+        }
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
new file mode 100644 (file)
index 0000000..18b86b8
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.spyk
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.MockConsumer
+import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.common.TopicPartition
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+    BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
+    "blueprintsprocessor.messageclient.sample.groupId=sample-group",
+    "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
+])
+open class BlueprintMessageConsumerServiceTest {
+
+    @Autowired
+    lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+    @Test
+    fun testKafkaBasicAuthConsumerService() {
+        runBlocking {
+            val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+            assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+            val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
+
+            val topic = "default-topic"
+            val partitions: MutableList<TopicPartition> = arrayListOf()
+            val topicsCollection: MutableList<String> = arrayListOf()
+            partitions.add(TopicPartition(topic, 1))
+            val partitionsBeginningMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+            val partitionsEndMap: MutableMap<TopicPartition, Long> = mutableMapOf()
+
+            val records: Long = 10
+            partitions.forEach { partition ->
+                partitionsBeginningMap[partition] = 0L
+                partitionsEndMap[partition] = records
+                topicsCollection.add(partition.topic())
+            }
+            val mockKafkaConsumer = MockConsumer<String, String>(OffsetResetStrategy.EARLIEST)
+            mockKafkaConsumer.subscribe(topicsCollection)
+            mockKafkaConsumer.rebalance(partitions)
+            mockKafkaConsumer.updateBeginningOffsets(partitionsBeginningMap)
+            mockKafkaConsumer.updateEndOffsets(partitionsEndMap)
+            for (i in 1..10) {
+                val record = ConsumerRecord<String, String>(topic, 1, i.toLong(), "key_$i",
+                        "I am message $i")
+                mockKafkaConsumer.addRecord(record)
+            }
+
+            every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
+            val channel = spyBlueprintMessageConsumerService.subscribe(null)
+            launch {
+                channel.consumeEach {
+                    println("Received message : $it")
+                }
+            }
+            //delay(100)
+            spyBlueprintMessageConsumerService.shutDown()
+        }
+    }
+}
\ No newline at end of file
index 0f8367d..0db62c1 100644 (file)
@@ -41,10 +41,10 @@ import kotlin.test.assertTrue
 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
 @TestPropertySource(properties =
-["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
-    "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
-    "blueprintsprocessor.messageclient.sample.topic=default-topic",
-    "blueprintsprocessor.messageclient.sample.clientId=default-client-id"
+["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092",
+    "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+    "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
 ])
 open class BlueprintMessageProducerServiceTest {
 
@@ -52,10 +52,10 @@ open class BlueprintMessageProducerServiceTest {
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
     @Test
-    fun testKafkaBasicAuthClientService() {
+    fun testKafkaBasicAuthProducertService() {
         runBlocking {
-            val bluePrintMessageClientService = bluePrintMessageLibPropertyService
-                    .blueprintMessageClientService("sample") as KafkaBasicAuthMessageProducerService
+            val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
 
             val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
 
@@ -64,11 +64,11 @@ open class BlueprintMessageProducerServiceTest {
 
             every { mockKafkaTemplate.send(any(), any()) } returns future
 
-            val spyBluePrintMessageClientService = spyk(bluePrintMessageClientService, recordPrivateCalls = true)
+            val spyBluePrintMessageProducerService = spyk(blueprintMessageProducerService, recordPrivateCalls = true)
 
-            every { spyBluePrintMessageClientService.messageTemplate(any()) } returns mockKafkaTemplate
+            every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
 
-            val response = spyBluePrintMessageClientService.sendMessage("Testing message")
+            val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
             assertTrue(response, "failed to get command response")
         }
     }
index 626b8f9..3868440 100644 (file)
@@ -19,7 +19,7 @@
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+            <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
         </encoder>
     </appender>
 
index 54cc0c1..e848ed3 100644 (file)
@@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
 import org.springframework.kafka.annotation.KafkaListener
 import org.springframework.stereotype.Service
 
+//TODO("Implement with property service and remove spring bindings")
 @ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
 @Service
 open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
@@ -56,18 +57,16 @@ open class MessagingController(private val propertyService: BluePrintMessageLibP
 
         val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
 
-       if (executionServiceOutput.status.code == EXECUTION_STATUS) {
-           val bluePrintMessageClientService = propertyService
-                   .blueprintMessageClientService(PREFIX)
+        if (executionServiceOutput.status.code == EXECUTION_STATUS) {
+            val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
 
-           val payload = executionServiceOutput.payload
+            val payload = executionServiceOutput.payload
 
-           log.info("The payload to publish is {}", payload)
+            log.info("The payload to publish is {}", payload)
 
-            bluePrintMessageClientService.sendMessage(payload)
-       }
-        else {
-           log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
-       }
+            blueprintMessageProducerService.sendMessage(payload)
+        } else {
+            log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
+        }
     }
 }