Kafka Messaging Controller API. 01/88301/23
authorprathameshmo <prathamesh.morde@bell.ca>
Tue, 11 Jun 2019 21:47:42 +0000 (17:47 -0400)
committerprathameshmo <prathamesh.morde@bell.ca>
Tue, 25 Jun 2019 21:03:44 +0000 (17:03 -0400)
Things done-
Addressed review comments.
Logic to consume events and process it.
Added integration testing.

Change-Id: If574a363f9fb8581018cc5a7ba106251a9d8caf1
Issue-ID:CCSDK-1356
Signed-off-by: prathamesh morde <prathamesh.morde@bell.ca>
Signed-off-by: prathameshmo <prathamesh.morde@bell.ca>
17 files changed:
ms/blueprintsprocessor/application/src/main/java/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintProcessorApplication.java
ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/application/src/main/resources/application.properties
ms/blueprintsprocessor/application/src/test/java/org/onap/ccsdk/cds/blueprintsprocessor/BlueprintProcessorApplicationTest.java
ms/blueprintsprocessor/application/src/test/resources/application.properties
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/pom.xml
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip [new file with mode: 0644]
ms/blueprintsprocessor/parent/pom.xml

index 2b6f8bc..c6400db 100644 (file)
@@ -29,8 +29,7 @@ import org.springframework.context.annotation.ComponentScan;
  */
 @SpringBootApplication
 @EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
-@ComponentScan(basePackages = {"org.onap.ccsdk.cds.controllerblueprints",
-    "org.onap.ccsdk.cds.blueprintsprocessor"})
+@ComponentScan(basePackages = {"org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"})
 public class BlueprintProcessorApplication {
 
     public static void main(String[] args) {
index a94fdf3..fae1adb 100755 (executable)
@@ -1,3 +1,4 @@
+<<<<<<< HEAD
 #
 #  Copyright � 2017-2018 AT&T Intellectual Property.
 #
@@ -73,3 +74,12 @@ blueprintsprocessor.cliExecutor.enabled=true
 ### If enabling remote python executor, set this value to true
 ### blueprintprocessor.remoteScriptCommand.enabled=true
 blueprintprocessor.remoteScriptCommand.enabled=false
+
+# Kafka-message-lib Configurations
+blueprintsprocessor.messageclient.self-service-api.topic=producer.t
+blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
+blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
index 1319d9f..d6e7dc8 100755 (executable)
@@ -72,3 +72,11 @@ blueprintsprocessor.restclient.primary-aai-data.url=https://aai.onap:8443
 blueprintsprocessor.restclient.primary-aai-data.username=aai@aai.onap.org
 blueprintsprocessor.restclient.primary-aai-data.password=demo123456!
 
+# Kafka-message-lib Configuration
+blueprintsprocessor.messageclient.self-service-api.topic=producer.t
+blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
+blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
index 90783d4..fc36e62 100644 (file)
@@ -16,7 +16,6 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor;
 
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,7 +33,6 @@ import org.springframework.test.context.junit4.SpringRunner;
  * @author Brinda Santh
  * DATE : 8/14/2018
  */
-
 @RunWith(SpringRunner.class)
 @ContextConfiguration(classes = {BlueprintProcessorApplication.class, BluePrintLoadConfiguration.class})
 @SpringBootTest(classes = BlueprintProcessorApplication.class,
index 09ee651..ea2f976 100644 (file)
@@ -46,3 +46,13 @@ blueprintprocessor.netconfExecutor.enabled=true
 blueprintprocessor.restConfExecutor.enabled=true
 blueprintsprocessor.cliExecutor.enabled=true
 blueprintprocessor.remoteScriptCommand.enabled=false
+
+
+# Kafka-message-lib Configuration
+blueprintsprocessor.messageclient.self-service-api.topic=producer.t
+blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
+blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
index 52ac346..008e924 100644 (file)
@@ -16,6 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import org.apache.commons.lang.builder.ToStringBuilder
 import org.apache.kafka.clients.producer.ProducerConfig.*
 import org.apache.kafka.common.serialization.StringSerializer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
@@ -27,7 +28,6 @@ import org.springframework.kafka.core.ProducerFactory
 import org.springframework.kafka.support.SendResult
 import org.springframework.util.concurrent.ListenableFutureCallback
 
-
 class KafkaBasicAuthMessageProducerService(
         private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties)
     : BlueprintMessageProducerService {
@@ -64,9 +64,8 @@ class KafkaBasicAuthMessageProducerService(
         return true
     }
 
-
     private fun producerFactory(additionalConfig: Map<String, Any>? = null): ProducerFactory<String, Any> {
-        log.info("Client Properties : $messageProducerProperties")
+        log.info("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
         val configProps = hashMapOf<String, Any>()
         configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
         configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
index 340f2c6..89ad720 100755 (executable)
@@ -42,6 +42,7 @@
             <artifactId>proto-definition</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.onap.ccsdk.cds.controllerblueprints</groupId>
             <artifactId>blueprint-core</artifactId>
             <artifactId>h2</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <!-- For Message libraries -->
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>message-lib</artifactId>
+        </dependency>
+
+        <!-- For spring-kafka -->
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Apache Kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
new file mode 100644 (file)
index 0000000..a04a799
--- /dev/null
@@ -0,0 +1,47 @@
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.support.serializer.JsonDeserializer
+
+@Configuration
+open class MessagingConfig {
+
+    @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
+    lateinit var groupId: String
+
+    @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+    lateinit var bootstrapServers: String
+
+    open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
+        val configProperties = hashMapOf<String, Any>()
+        configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+        configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
+        configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+        return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(), JsonDeserializer(ExecutionServiceInput::class.java))
+    }
+
+    /**
+     *  Creation of a Kafka MessageListener Container
+     *
+     *  @return KafkaListener instance.
+     */
+    @Bean
+    open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
+        val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
+        factory.consumerFactory = consumerFactory()
+        return factory
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
new file mode 100644 (file)
index 0000000..1d219a8
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.async
+import kotlinx.coroutines.runBlocking
+import org.apache.commons.lang3.builder.ToStringBuilder
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.stereotype.Service
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
+@Service
+open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
+                               private val executionServiceHandler: ExecutionServiceHandler) {
+
+    private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
+
+    companion object {
+        // TODO PREFIX should be retrieved from model or from request.
+        const val PREFIX = "self-service-api"
+        const val EXECUTION_STATUS = 200
+    }
+
+    @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
+    open fun receive(input: ExecutionServiceInput) {
+
+        log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+        runBlocking {
+            log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(input))
+
+            // Process the message.
+            async {
+                processMessage(input)
+            }
+        }
+    }
+
+    private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
+
+        val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+
+       if (executionServiceOutput.status.code == EXECUTION_STATUS) {
+           val bluePrintMessageClientService = propertyService
+                   .blueprintMessageClientService(PREFIX)
+
+           val payload = executionServiceOutput.payload
+
+           log.info("The payload to publish is {}", payload)
+
+            bluePrintMessageClientService.sendMessage(payload)
+       }
+        else {
+           log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
+       }
+    }
+}
index fd764d7..e084c60 100644 (file)
@@ -23,6 +23,8 @@ import io.grpc.testing.GrpcServerRule
 import org.junit.Rule
 import org.junit.Test
 import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
 import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
@@ -33,6 +35,7 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.FileChunk
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
@@ -44,7 +47,9 @@ import kotlin.test.assertTrue
 @RunWith(SpringRunner::class)
 @EnableAutoConfiguration
 @DirtiesContext
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+        excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class,
+            MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)])
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class BluePrintManagementGRPCHandlerTest {
 
index f8b972e..5072b3c 100644 (file)
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
@@ -45,7 +46,8 @@ import kotlin.test.BeforeTest
 @RunWith(SpringRunner::class)
 @DirtiesContext
 @EnableAutoConfiguration
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+        excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class BluePrintProcessingGRPCHandlerTest {
     private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java)
index 9cbd898..65b4126 100644 (file)
@@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.security.SecurityProperties
 import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
 import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.FilterType
 import org.springframework.core.io.ByteArrayResource
 import org.springframework.http.client.MultipartBodyBuilder
 import org.springframework.test.context.ContextConfiguration
@@ -49,7 +50,8 @@ import kotlin.test.assertTrue
 @RunWith(SpringRunner::class)
 @WebFluxTest
 @ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
+        excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class ExecutionServiceHandlerTest {
 
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
new file mode 100644 (file)
index 0000000..f7459f5
--- /dev/null
@@ -0,0 +1,211 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
+
+import com.fasterxml.jackson.databind.node.ObjectNode
+import kotlinx.coroutines.reactive.awaitSingle
+import kotlinx.coroutines.runBlocking
+import org.apache.commons.lang.builder.ToStringBuilder
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.autoconfigure.security.SecurityProperties
+import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.io.ByteArrayResource
+import org.springframework.http.client.MultipartBodyBuilder
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.kafka.annotation.PartitionOffset
+import org.springframework.kafka.annotation.TopicPartition
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.test.context.EmbeddedKafka
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import org.springframework.test.web.reactive.server.WebTestClient
+import org.springframework.test.web.reactive.server.returnResult
+import org.springframework.web.reactive.function.BodyInserters
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Paths
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@EnableAutoConfiguration
+@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+@DirtiesContext
+@EmbeddedKafka(ports = [9092])
+@WebFluxTest
+class MessagingControllerTest {
+
+    private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!!
+
+    @Autowired
+    lateinit var controller: MessagingController
+
+    @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}")
+    lateinit var topicUsedForConsumer: String
+
+    @Autowired
+    lateinit var kt: KafkaTemplate<String, ExecutionServiceInput>
+
+    @Autowired
+    lateinit var webTestClient: WebTestClient
+
+    var receivedEvent: String? = null
+
+    @Before
+    fun setup() {
+        deleteDir("target", "blueprints")
+        uploadBluePrint()
+    }
+
+    @After
+    fun clean() {
+        deleteDir("target", "blueprints")
+    }
+
+    @Test
+    fun testReceive() {
+        val samplePayload = "{\n" +
+                "    \"resource-assignment-request\": {\n" +
+                "      \"artifact-name\": [\"hostname\"],\n" +
+                "      \"store-result\": true,\n" +
+                "      \"resource-assignment-properties\" : {\n" +
+                "        \"hostname\": \"demo123\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }"
+
+        kt.defaultTopic = topicUsedForConsumer
+
+        val input = ExecutionServiceInput().apply {
+            commonHeader = CommonHeader().apply {
+                originatorId = "1"
+                requestId = "1234"
+                subRequestId = "1234-1234"
+            }
+
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "golden"
+                blueprintVersion = "1.0.0"
+                actionName = "resource-assignment"
+                mode = "sync"
+            }
+
+            stepData = StepData().apply {
+                name = "resource-assignment"
+            }
+
+            payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode
+        }
+
+        kt.sendDefault(input)
+        log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
+
+        Thread.sleep(1000)
+    }
+
+    @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
+    fun receivedEventFromBluePrintProducer(event: ExecutionServiceInput) {
+        assertNotNull(event)
+    }
+
+    private fun uploadBluePrint() {
+        runBlocking {
+            val body = MultipartBodyBuilder().apply {
+                part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
+                    override fun getFilename(): String {
+                        return "test-cba.zip"
+                    }
+                })
+            }.build()
+
+            webTestClient
+                    .post()
+                    .uri("/api/v1/execution-service/upload")
+                    .body(BodyInserters.fromMultipartData(body))
+                    .exchange()
+                    .expectStatus().isOk
+                    .returnResult<String>()
+                    .responseBody
+                    .awaitSingle()
+        }
+    }
+
+    private fun loadCbaArchive():File {
+        return Paths.get("./src/test/resources/cba-for-kafka-integration.zip").toFile()
+    }
+
+    @Configuration
+    @EnableKafka
+    open class ConsumerConfiguration {
+
+        @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+        lateinit var bootstrapServers: String
+
+        @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
+        lateinit var groupId:String
+
+        @Bean
+        open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? {
+            val configProperties = hashMapOf<String, Any>()
+            configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+            configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+            configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
+            configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
+            configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
+            configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000
+
+            return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
+                    JsonDeserializer(ExecutionServiceInput::class.java))
+        }
+
+        @Bean
+        open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
+            val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
+            factory.consumerFactory = consumerFactory2()
+            return factory
+        }
+    }
+}
+
+
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
new file mode 100644 (file)
index 0000000..dc1f38a
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.core.ProducerFactory
+import org.springframework.kafka.support.serializer.JsonSerializer
+
+@Configuration
+open class ProducerConfiguration {
+
+    @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
+    lateinit var bootstrapServers: String
+
+    open fun kpf(): ProducerFactory<String, ExecutionServiceInput> {
+        val configs = HashMap<String, Any>()
+        configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+        configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+        configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
+        return DefaultKafkaProducerFactory(configs)
+    }
+
+    @Bean
+    open fun kt(): KafkaTemplate<String, ExecutionServiceInput> {
+        return KafkaTemplate<String, ExecutionServiceInput>(kpf())
+    }
+}
\ No newline at end of file
index 6705523..d532b15 100644 (file)
@@ -31,3 +31,12 @@ blueprintsprocessor.blueprintArchivePath=./target/blueprints/archive
 # Python executor
 blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints
 blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
+
+# Kafka-message-lib Configuration
+blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true
+blueprintsprocessor.messageclient.self-service-api.topic=producer.t
+blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
+blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip
new file mode 100644 (file)
index 0000000..2307038
Binary files /dev/null and b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/cba-for-kafka-integration.zip differ
index 82c2e61..cd0f4bc 100755 (executable)
                 <scope>test</scope>
             </dependency>
 
+            <!-- message-lib dependency -->
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>message-lib</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>