Add Kafka client services 15/88115/3
authorBrinda Santh <brindasanth@in.ibm.com>
Mon, 20 May 2019 19:27:55 +0000 (15:27 -0400)
committerSteve Siani <alphonse.steve.siani.djissitchi@ibm.com>
Thu, 23 May 2019 04:15:54 +0000 (00:15 -0400)
Change-Id: I76cf52f6df10e114539c4d65620f431e0f747644
Issue-ID: CCSDK-1349
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
ms/blueprintsprocessor/modules/commons/message-lib/pom.xml [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/pom.xml
ms/blueprintsprocessor/parent/pom.xml

diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml b/ms/blueprintsprocessor/modules/commons/message-lib/pom.xml
new file mode 100644 (file)
index 0000000..d423dfd
--- /dev/null
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Copyright © 2019 IBM.
+  ~
+  ~  Licensed under the Apache License, Version 2.0 (the "License");
+  ~  you may not use this file except in compliance with the License.
+  ~  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>commons</artifactId>
+        <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+        <version>0.5.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>message-lib</artifactId>
+    <name>Blueprints Processor Messaging Lib</name>
+    <description>Blueprints Processor Messaging Lib</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.controllerblueprints</groupId>
+            <artifactId>blueprint-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>processor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
new file mode 100644 (file)
index 0000000..644c518
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+@EnableConfigurationProperties
+open class BluePrintMessageLibConfiguration
+
+class MessageLibConstants {
+    companion object {
+        const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
+        const val PROPERTY_MESSAGE_CLIENT_PREFIX = "blueprintsprocessor.messageclient."
+        const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
new file mode 100644 (file)
index 0000000..e621ec6
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+
+open class MessageProducerProperties
+
+
+open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
+    lateinit var bootstrapServers: String
+    var topic: String? = null
+    var clientId: String? = null
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
new file mode 100644 (file)
index 0000000..fb01ce1
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import com.fasterxml.jackson.databind.JsonNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.springframework.stereotype.Service
+
+@Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
+open class BluePrintMessageLibPropertyService(private var bluePrintProperties: BluePrintProperties) {
+
+    fun blueprintMessageClientService(jsonNode: JsonNode): BlueprintMessageProducerService {
+        val messageClientProperties = messageClientProperties(jsonNode)
+        return blueprintMessageClientService(messageClientProperties)
+    }
+
+    fun blueprintMessageClientService(selector: String): BlueprintMessageProducerService {
+        val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_CLIENT_PREFIX}$selector"
+        val messageClientProperties = messageClientProperties(prefix)
+        return blueprintMessageClientService(messageClientProperties)
+    }
+
+    fun messageClientProperties(prefix: String): MessageProducerProperties {
+        val type = bluePrintProperties.propertyBeanType("$prefix.type", String::class.java)
+        return when (type) {
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                kafkaBasicAuthMessageClientProperties(prefix)
+            }
+            else -> {
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            }
+        }
+    }
+
+    fun messageClientProperties(jsonNode: JsonNode): MessageProducerProperties {
+        val type = jsonNode.get("type").textValue()
+        return when (type) {
+            MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
+            }
+            else -> {
+                throw BluePrintProcessorException("Message adaptor($type) is not supported")
+            }
+        }
+    }
+
+    private fun blueprintMessageClientService(MessageProducerProperties: MessageProducerProperties)
+            : BlueprintMessageProducerService {
+
+        when (MessageProducerProperties) {
+            is KafkaBasicAuthMessageProducerProperties -> {
+                return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+            }
+            else -> {
+                throw BluePrintProcessorException("couldn't get Message client service for")
+            }
+        }
+    }
+
+    private fun kafkaBasicAuthMessageClientProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
+        return bluePrintProperties.propertyBeanType(
+                prefix, KafkaBasicAuthMessageProducerProperties::class.java)
+    }
+
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
new file mode 100644 (file)
index 0000000..e33d41c
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.runBlocking
+
+interface BlueprintMessageProducerService {
+
+    fun sendMessage(message: Any): Boolean = runBlocking {
+        sendMessageNB(message)
+    }
+
+    fun sendMessage(topic: String, message: Any): Boolean = runBlocking {
+        sendMessageNB(topic, message)
+    }
+
+    suspend fun sendMessageNB(message: Any): Boolean
+
+    suspend fun sendMessageNB(topic: String, message: Any): Boolean
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
new file mode 100644 (file)
index 0000000..52ac346
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.clients.producer.ProducerConfig.*
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.slf4j.LoggerFactory
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.core.ProducerFactory
+import org.springframework.kafka.support.SendResult
+import org.springframework.util.concurrent.ListenableFutureCallback
+
+
+class KafkaBasicAuthMessageProducerService(
+        private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
+    : BlueprintMessageProducerService {
+
+    private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
+
+    private var kafkaTemplate: KafkaTemplate<String, Any>? = null
+
+    override suspend fun sendMessageNB(message: Any): Boolean {
+        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
+        return sendMessage(messageProducerProperties.topic!!, message)
+    }
+
+    override suspend fun sendMessageNB(topic: String, message: Any): Boolean {
+        val serializedMessage = when (message) {
+            is String -> {
+                message
+            }
+            else -> {
+                message.asJsonType().toString()
+            }
+        }
+        val future = messageTemplate().send(topic, serializedMessage)
+
+        future.addCallback(object : ListenableFutureCallback<SendResult<String, Any>> {
+            override fun onSuccess(result: SendResult<String, Any>) {
+                log.info("message sent successfully with offset=[${result.recordMetadata.offset()}]")
+            }
+
+            override fun onFailure(ex: Throwable) {
+                log.error("Unable to send message", ex)
+            }
+        })
+        return true
+    }
+
+
+    private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
+        log.info("Client Properties : $messageProducerProperties")
+        val configProps = hashMapOf<String, Any>()
+        configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
+        configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        configProps[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        if (messageProducerProperties.clientId != null) {
+            configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
+        }
+        // TODO("Security Implementation based on type")
+
+        // Add additional Properties
+        if (additionalConfig != null) {
+            configProps.putAll(additionalConfig)
+        }
+        return DefaultKafkaProducerFactory(configProps)
+    }
+
+    fun messageTemplate(additionalConfig: Map<String, Any>? = null): KafkaTemplate<String, Any> {
+        log.info("Prepering templates")
+        if (kafkaTemplate == null) {
+            kafkaTemplate = KafkaTemplate(producerFactory(additionalConfig))
+        }
+        return kafkaTemplate!!
+    }
+}
+
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
new file mode 100644 (file)
index 0000000..0f8367d
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.runBlocking
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.SendResult
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import org.springframework.util.concurrent.SettableListenableFuture
+import kotlin.test.Test
+import kotlin.test.assertTrue
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+    BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
+    "blueprintsprocessor.messageclient.sample.topic=default-topic",
+    "blueprintsprocessor.messageclient.sample.clientId=default-client-id"
+])
+open class BlueprintMessageProducerServiceTest {
+
+    @Autowired
+    lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+    @Test
+    fun testKafkaBasicAuthClientService() {
+        runBlocking {
+            val bluePrintMessageClientService = bluePrintMessageLibPropertyService
+                    .blueprintMessageClientService("sample") as KafkaBasicAuthMessageProducerService
+
+            val mockKafkaTemplate = mockk<KafkaTemplate<String, Any>>()
+
+            val future = SettableListenableFuture<SendResult<String, Any>>()
+            //future.setException(BluePrintException("failed sending"))
+
+            every { mockKafkaTemplate.send(any(), any()) } returns future
+
+            val spyBluePrintMessageClientService = spyk(bluePrintMessageClientService, recordPrivateCalls = true)
+
+            every { spyBluePrintMessageClientService.messageTemplate(any()) } returns mockKafkaTemplate
+
+            val response = spyBluePrintMessageClientService.sendMessage("Testing message")
+            assertTrue(response, "failed to get command response")
+        }
+    }
+
+}
+
+
+
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..626b8f9
--- /dev/null
@@ -0,0 +1,35 @@
+<!--
+  ~  Copyright © 2019 IBM.
+  ~
+  ~  Licensed under the Apache License, Version 2.0 (the "License");
+  ~  you may not use this file except in compliance with the License.
+  ~  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.springframework.test" level="warn"/>
+    <logger name="org.springframework" level="warn"/>
+    <logger name="org.hibernate" level="info"/>
+    <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+    <root level="warn">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
index 5fdfb50..782ce61 100755 (executable)
@@ -36,6 +36,7 @@
         <module>rest-lib</module>
         <module>dmaap-lib</module>
         <module>grpc-lib</module>
+        <module>message-lib</module>
     </modules>
     <dependencies>
         <dependency>
index 5f3e926..9523287 100755 (executable)
@@ -39,6 +39,8 @@
         <sshd.version>2.2.0</sshd.version>
         <jsch.version>0.1.55</jsch.version>
         <protobuff.java.utils.version>3.6.1</protobuff.java.utils.version>
+        <spring.kafka.version>2.2.6.RELEASE</spring.kafka.version>
+        <kafka.version>2.2.0</kafka.version>
         <eelf.version>1.0.0</eelf.version>
         <guava.version>27.0.1-jre</guava.version>
         <jython.version>2.7.1</jython.version>
                 <scope>import</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>${spring.kafka.version}</version>
+            </dependency>
+
             <!--Swagger Dependencies -->
             <dependency>
                 <groupId>io.springfox</groupId>
                 <version>${jsch.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-streams</artifactId>
+                <version>${kafka.version}</version>
+            </dependency>
+
             <!-- SLI Version -->
             <dependency>
                 <groupId>org.onap.ccsdk.sli.core</groupId>
                 <version>${grpc.version}</version>
                 <scope>test</scope>
             </dependency>
+
+            <!-- Spring Kafka -->
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka-test</artifactId>
+                <version>${spring.kafka.version}</version>
+                <scope>test</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>