Add Config based blueprint process consumer 57/95057/1
authorBrinda Santh <brindasanth@in.ibm.com>
Thu, 5 Sep 2019 15:06:48 +0000 (11:06 -0400)
committerBrinda Santh <brindasanth@in.ibm.com>
Thu, 5 Sep 2019 15:06:48 +0000 (11:06 -0400)
Change-Id: I9e37ecb5032047f835f3b2ea20b2689c76353497
Issue-ID: CCSDK-1668
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
21 files changed:
ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/application/src/main/resources/application.properties
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt [deleted file]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt [deleted file]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintManagementGRPCHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt [deleted file]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt [deleted file]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback-test.xml [moved from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/logback.xml with 91% similarity]
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/CustomFunctions.kt

index 019c5aa..2fd5951 100755 (executable)
@@ -99,10 +99,15 @@ blueprintsprocessor.cliExecutor.enabled=true
 blueprintprocessor.remoteScriptCommand.enabled=false
 
 # Kafka-message-lib Configurations
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
\ No newline at end of file
index 4a4f794..75d9cba 100755 (executable)
@@ -92,10 +92,15 @@ blueprintsprocessor.restclient.aai-data.username=aai@aai.onap.org
 blueprintsprocessor.restclient.aai-data.password=demo123456!
 
 # Kafka-message-lib Configuration
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
index 281a970..27a444b 100644 (file)
@@ -59,8 +59,7 @@ fun BluePrintDependencyService.messageConsumerService(jsonNode: JsonNode): Bluep
 class MessageLibConstants {
     companion object {
         const val SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY = "blueprint-message-lib-property-service"
-        //TODO("Change to .messageconsumer in application.properties")
-        const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageclient."
+        const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
         const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
         const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
     }
index c77cdfd..ab04054 100644 (file)
@@ -33,8 +33,9 @@ open class MessageConsumerProperties
 open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
     lateinit var bootstrapServers: String
     lateinit var groupId: String
-    var consumerTopic: String? = null
-    var pollMillSec: Long = 100
+    var clientId: String? = null
+    var topic: String? = null
+    var pollMillSec: Long = 1000
 }
 
 open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
index 076501e..5a9e61b 100644 (file)
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.CommonClientConfigs
@@ -47,6 +48,10 @@ class KafkaBasicAuthMessageConsumerService(
         configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
         configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
         configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+        if (messageConsumerProperties.clientId != null) {
+            configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
+        }
+        // TODO("Security Implementation based on type")
         /** add or override already set properties */
         additionalConfig?.let { configProperties.putAll(it) }
         /** Create Kafka consumer */
@@ -55,7 +60,7 @@ class KafkaBasicAuthMessageConsumerService(
 
     override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
         /** get to topic names */
-        val consumerTopic = messageConsumerProperties.consumerTopic?.split(",")?.map { it.trim() }
+        val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
         return subscribe(consumerTopic, additionalConfig)
     }
@@ -64,6 +69,7 @@ class KafkaBasicAuthMessageConsumerService(
     override suspend fun subscribe(consumerTopic: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
         /** Create Kafka consumer */
         kafkaConsumer = kafkaConsumer(additionalConfig)
+
         checkNotNull(kafkaConsumer) {
             "failed to create kafka consumer for " +
                     "server(${messageConsumerProperties.bootstrapServers})'s " +
@@ -73,7 +79,7 @@ class KafkaBasicAuthMessageConsumerService(
         kafkaConsumer!!.subscribe(consumerTopic)
         log.info("Successfully consumed topic($consumerTopic)")
 
-        val listenerThread = thread(start = true, name = "KafkaConsumer") {
+        thread(start = true, name = "KafkaConsumer") {
             keepGoing = true
             kafkaConsumer!!.use { kc ->
                 while (keepGoing) {
@@ -93,21 +99,18 @@ class KafkaBasicAuthMessageConsumerService(
                         }
                     }
                 }
+                log.info("message listener shutting down.....")
             }
-
         }
-        log.info("Successfully consumed in thread(${listenerThread})")
         return channel
     }
 
     override suspend fun shutDown() {
-        /** Close the Channel */
-        channel.close()
         /** stop the polling loop */
         keepGoing = false
-        if (kafkaConsumer != null) {
-            /** sunsubscribe the consumer */
-            kafkaConsumer!!.unsubscribe()
-        }
+        /** Close the Channel */
+        channel.cancel()
+        /** TO shutdown gracefully, need to wait for the maximum poll time */
+        delay(messageConsumerProperties.pollMillSec)
     }
 }
index 18b86b8..2b84eaa 100644 (file)
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 import io.mockk.every
 import io.mockk.spyk
 import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -30,12 +31,14 @@ import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
 
 
 @RunWith(SpringRunner::class)
@@ -43,12 +46,20 @@ import kotlin.test.assertNotNull
 @ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
 @TestPropertySource(properties =
-["blueprintsprocessor.messageclient.sample.type=kafka-basic-auth",
-    "blueprintsprocessor.messageclient.sample.bootstrapServers=127:0.0.1:9092",
-    "blueprintsprocessor.messageclient.sample.groupId=sample-group",
-    "blueprintsprocessor.messageclient.sample.consumerTopic=default-topic"
+["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
+    "blueprintsprocessor.messageconsumer.sample.topic=default-topic",
+    "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
+    "blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+
+    "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageproducer.sample.topic=default-topic",
+    "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
 ])
 open class BlueprintMessageConsumerServiceTest {
+    val log = logger(BlueprintMessageConsumerServiceTest::class)
 
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -90,11 +101,40 @@ open class BlueprintMessageConsumerServiceTest {
             val channel = spyBlueprintMessageConsumerService.subscribe(null)
             launch {
                 channel.consumeEach {
-                    println("Received message : $it")
+                    assertTrue(it.startsWith("I am message"), "failed to get the actual message")
                 }
             }
-            //delay(100)
+            delay(10)
             spyBlueprintMessageConsumerService.shutDown()
         }
     }
+
+    /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+    //@Test
+    fun testKafkaIntegration() {
+        runBlocking {
+            val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+            assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
+
+            val channel = blueprintMessageConsumerService.subscribe(null)
+            launch {
+                channel.consumeEach {
+                    log.info("Consumed Message : $it")
+                }
+            }
+
+            /** Send message with every 1 sec */
+            val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+            launch {
+                repeat(5) {
+                    delay(1000)
+                    blueprintMessageProducerService.sendMessage("this is my message($it)")
+                }
+            }
+            delay(10000)
+            blueprintMessageConsumerService.shutDown()
+        }
+    }
 }
\ No newline at end of file
index 0db62c1..31bcc15 100644 (file)
@@ -42,7 +42,7 @@ import kotlin.test.assertTrue
     BlueprintPropertyConfiguration::class, BluePrintProperties::class])
 @TestPropertySource(properties =
 ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
-    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127:0.0.1:9092",
+    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
     "blueprintsprocessor.messageproducer.sample.topic=default-topic",
     "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
 ])
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
new file mode 100644 (file)
index 0000000..b339903
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.event.ApplicationReadyEvent
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import javax.annotation.PreDestroy
+
+@ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
+        havingValue = "true")
+@Service
+open class BluePrintProcessingKafkaConsumer(
+        private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+        private val executionServiceHandler: ExecutionServiceHandler) {
+
+    val log = logger(BluePrintProcessingKafkaConsumer::class)
+
+    private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
+
+    companion object {
+        const val CONSUMER_SELECTOR = "self-service-api"
+        const val PRODUCER_SELECTOR = "self-service-api"
+    }
+
+    @EventListener(ApplicationReadyEvent::class)
+    fun setupMessageListener() = runBlocking {
+        try {
+            log.info("Setting up message consumer($CONSUMER_SELECTOR) and " +
+                    "message producer($PRODUCER_SELECTOR)...")
+
+            /** Get the Message Consumer Service **/
+            blueprintMessageConsumerService = try {
+                bluePrintMessageLibPropertyService
+                        .blueprintMessageConsumerService(CONSUMER_SELECTOR)
+            } catch (e: Exception) {
+                throw BluePrintProcessorException("failed to create consumer service ${e.message}")
+            }
+
+            /** Get the Message Producer Service **/
+            val blueprintMessageProducerService = try {
+                bluePrintMessageLibPropertyService
+                        .blueprintMessageProducerService(PRODUCER_SELECTOR)
+            } catch (e: Exception) {
+                throw BluePrintProcessorException("failed to create producer service ${e.message}")
+            }
+
+            launch {
+                /** Subscribe to the consumer topics */
+                val additionalConfig: MutableMap<String, Any> = hashMapOf()
+                val channel = blueprintMessageConsumerService.subscribe(additionalConfig)
+                channel.consumeEach { message ->
+                    launch {
+                        try {
+                            log.trace("Consumed Message : $message")
+                            val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+                            val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+                            //TODO("In future, Message publisher configuration vary with respect to request")
+                            /** Send the response message */
+                            blueprintMessageProducerService.sendMessage(executionServiceOutput)
+                        } catch (e: Exception) {
+                            log.error("failed in processing the consumed message : $message", e)
+                        }
+                    }
+                }
+            }
+        } catch (e: Exception) {
+            log.error("failed to start message consumer($CONSUMER_SELECTOR) and " +
+                    "message producer($PRODUCER_SELECTOR) ", e)
+        }
+    }
+
+    @PreDestroy
+    fun shutdownMessageListener() = runBlocking {
+        try {
+            log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
+                    "message producer($PRODUCER_SELECTOR)...")
+            blueprintMessageConsumerService.shutDown()
+        } catch (e: Exception) {
+            log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingConfig.kt
deleted file mode 100644 (file)
index 17e157d..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import com.fasterxml.jackson.databind.DeserializationFeature
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
-import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
-import org.springframework.kafka.support.serializer.JsonDeserializer
-
-@Configuration
-open class MessagingConfig {
-
-    @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
-    lateinit var groupId: String
-
-    @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
-    lateinit var bootstrapServers: String
-
-    open fun consumerFactory(): ConsumerFactory<String, ExecutionServiceInput>? {
-        val configProperties = hashMapOf<String, Any>()
-        configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
-        configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
-        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
-        configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
-        configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer2::class.java
-        configProperties[ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java.name
-
-        val deserializer = JsonDeserializer<ExecutionServiceInput>()
-        deserializer.setRemoveTypeHeaders(true)
-        deserializer.addTrustedPackages("*")
-
-        val jsonDeserializer =  JsonDeserializer(ExecutionServiceInput::class.java,
-                ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false))
-
-        return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
-                ErrorHandlingDeserializer2<ExecutionServiceInput>(jsonDeserializer))
-    }
-
-    /**
-     *  Creation of a Kafka MessageListener Container
-     *
-     *  @return KafkaListener instance.
-     */
-    @Bean
-    open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
-        val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
-        factory.consumerFactory = consumerFactory()
-        return factory
-    }
-}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/MessagingController.kt
deleted file mode 100644 (file)
index e848ed3..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-
-import kotlinx.coroutines.async
-import kotlinx.coroutines.runBlocking
-import org.apache.commons.lang3.builder.ToStringBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.slf4j.LoggerFactory
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
-import org.springframework.kafka.annotation.KafkaListener
-import org.springframework.stereotype.Service
-
-//TODO("Implement with property service and remove spring bindings")
-@ConditionalOnProperty(name = ["blueprintsprocessor.messageclient.self-service-api.kafkaEnable"], havingValue = "true")
-@Service
-open class MessagingController(private val propertyService: BluePrintMessageLibPropertyService,
-                               private val executionServiceHandler: ExecutionServiceHandler) {
-
-    private val log = LoggerFactory.getLogger(MessagingController::class.java)!!
-
-    companion object {
-        // TODO PREFIX should be retrieved from model or from request.
-        const val PREFIX = "self-service-api"
-        const val EXECUTION_STATUS = 200
-    }
-
-    @KafkaListener(topics = ["\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}"])
-    open fun receive(record: ConsumerRecord<String, ExecutionServiceInput>) {
-
-        runBlocking {
-            log.info("Successfully received a message: {}", ToStringBuilder.reflectionToString(record.value()))
-
-            // Process the message.
-            async {
-                processMessage(record.value())
-            }.await()
-        }
-    }
-
-    private suspend fun processMessage(executionServiceInput: ExecutionServiceInput) {
-
-        val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-
-        if (executionServiceOutput.status.code == EXECUTION_STATUS) {
-            val blueprintMessageProducerService = propertyService.blueprintMessageProducerService(PREFIX)
-
-            val payload = executionServiceOutput.payload
-
-            log.info("The payload to publish is {}", payload)
-
-            blueprintMessageProducerService.sendMessage(payload)
-        } else {
-            log.error("Fail to process the given event due to {}", executionServiceOutput.status.errorMessage)
-        }
-    }
-}
index ea05e88..9629aa4 100644 (file)
@@ -23,8 +23,6 @@ import io.grpc.testing.GrpcServerRule
 import org.junit.Rule
 import org.junit.Test
 import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.MessagingControllerTest
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib.ProducerConfiguration
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
 import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
@@ -33,7 +31,6 @@ import org.onap.ccsdk.cds.controllerblueprints.management.api.*
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
@@ -45,9 +42,7 @@ import kotlin.test.assertTrue
 @RunWith(SpringRunner::class)
 @EnableAutoConfiguration
 @DirtiesContext
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
-        excludeFilters = [ComponentScan.Filter(value = [MessagingConfig::class, MessagingController::class, ProducerConfiguration::class,
-            MessagingControllerTest.ConsumerConfiguration::class, MessagingControllerTest::class], type = FilterType.ASSIGNABLE_TYPE)])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class BluePrintManagementGRPCHandlerTest {
 
index ce5acd4..8bedc96 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Copyright © 2017-2018 AT&T Intellectual Property.
  * Modifications Copyright © 2019 Bell Canada.
+ * Modifications Copyright © 2019 IBM.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
@@ -44,8 +44,8 @@ import kotlin.test.BeforeTest
 @RunWith(SpringRunner::class)
 @DirtiesContext
 @EnableAutoConfiguration
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
-        excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor",
+    "org.onap.ccsdk.cds.controllerblueprints"])
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class BluePrintProcessingGRPCHandlerTest {
     private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandlerTest::class.java)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumerTest.kt
new file mode 100644 (file)
index 0000000..7d43f53
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import io.mockk.coEvery
+import io.mockk.mockk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+@RunWith(SpringRunner::class)
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+    BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+class BluePrintProcessingKafkaConsumerTest {
+
+    @Autowired
+    lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+    @Test
+    fun testExecutionInputMessageConsumer() {
+        runBlocking {
+            assertNotNull(bluePrintMessageLibPropertyService,
+                    "failed to initialise bluePrintMessageLibPropertyService")
+
+            val executionServiceHandle = mockk<ExecutionServiceHandler>()
+
+            coEvery { executionServiceHandle.doProcess(any()) } returns mockk()
+
+            val bluePrintProcessingKafkaConsumer = BluePrintProcessingKafkaConsumer(bluePrintMessageLibPropertyService,
+                    executionServiceHandle)
+
+            launch {
+                bluePrintProcessingKafkaConsumer.setupMessageListener()
+            }
+            delay(100)
+            bluePrintProcessingKafkaConsumer.shutdownMessageListener()
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceControllerTest.kt
new file mode 100644 (file)
index 0000000..fc6c489
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2019 Bell Canada
+ * Modifications Copyright © 2019 IBM.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import kotlinx.coroutines.reactive.awaitSingle
+import kotlinx.coroutines.runBlocking
+import org.junit.After
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.autoconfigure.security.SecurityProperties
+import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.core.io.ByteArrayResource
+import org.springframework.http.client.MultipartBodyBuilder
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import org.springframework.test.web.reactive.server.WebTestClient
+import org.springframework.test.web.reactive.server.returnResult
+import org.springframework.web.reactive.function.BodyInserters
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.Paths
+import kotlin.test.Test
+
+@RunWith(SpringRunner::class)
+@EnableAutoConfiguration
+@ContextConfiguration(classes = [ExecutionServiceControllerTest::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
+@TestPropertySource(locations = ["classpath:application-test.properties"])
+@DirtiesContext
+@WebFluxTest
+class ExecutionServiceControllerTest {
+
+    private val log = LoggerFactory.getLogger(ExecutionServiceControllerTest::class.java)!!
+
+    @Autowired
+    lateinit var webTestClient: WebTestClient
+
+    var event: ExecutionServiceInput? = null
+
+    @Before
+    fun setup() {
+        deleteDir("target", "blueprints")
+    }
+
+    @After
+    fun clean() {
+        deleteDir("target", "blueprints")
+    }
+
+    @Test
+    fun uploadBluePrint() {
+        runBlocking {
+            val body = MultipartBodyBuilder().apply {
+                part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
+                    override fun getFilename(): String {
+                        return "test-cba.zip"
+                    }
+                })
+            }.build()
+
+            webTestClient
+                    .post()
+                    .uri("/api/v1/execution-service/upload")
+                    .body(BodyInserters.fromMultipartData(body))
+                    .exchange()
+                    .expectStatus().isOk
+                    .returnResult<String>()
+                    .responseBody
+                    .awaitSingle()
+        }
+    }
+
+    private fun loadCbaArchive(): File {
+        return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
+    }
+}
+
+
index d9e352b..a480b11 100644 (file)
@@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.security.SecurityProperties
 import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
 import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.FilterType
 import org.springframework.core.io.ByteArrayResource
 import org.springframework.http.client.MultipartBodyBuilder
 import org.springframework.test.context.ContextConfiguration
@@ -49,9 +48,10 @@ import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
 @WebFluxTest
-@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class, BluePrintCatalogService::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"],
-        excludeFilters =arrayOf(ComponentScan.Filter(value = [(MessagingController::class)], type = FilterType.ASSIGNABLE_TYPE)))
+@ContextConfiguration(classes = [ExecutionServiceHandler::class, BluePrintCoreConfiguration::class,
+    BluePrintCatalogService::class, SecurityProperties::class])
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor",
+    "org.onap.ccsdk.cds.controllerblueprints"])
 @TestPropertySource(locations = ["classpath:application-test.properties"])
 class ExecutionServiceHandlerTest {
 
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/MessagingControllerTest.kt
deleted file mode 100644 (file)
index facbec5..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
-
-import com.fasterxml.jackson.databind.node.ObjectNode
-import kotlinx.coroutines.reactive.awaitSingle
-import kotlinx.coroutines.runBlocking
-import org.apache.commons.lang.builder.ToStringBuilder
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.After
-import org.junit.Before
-import org.junit.Ignore
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StepData
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.MessagingController
-import org.onap.ccsdk.cds.controllerblueprints.core.deleteDir
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import org.slf4j.LoggerFactory
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration
-import org.springframework.boot.autoconfigure.security.SecurityProperties
-import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.ComponentScan
-import org.springframework.context.annotation.Configuration
-import org.springframework.core.io.ByteArrayResource
-import org.springframework.http.client.MultipartBodyBuilder
-import org.springframework.kafka.annotation.EnableKafka
-import org.springframework.kafka.annotation.KafkaListener
-import org.springframework.kafka.annotation.PartitionOffset
-import org.springframework.kafka.annotation.TopicPartition
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
-import org.springframework.kafka.core.ConsumerFactory
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.serializer.JsonDeserializer
-import org.springframework.kafka.test.context.EmbeddedKafka
-import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.ContextConfiguration
-import org.springframework.test.context.TestPropertySource
-import org.springframework.test.context.junit4.SpringRunner
-import org.springframework.test.web.reactive.server.WebTestClient
-import org.springframework.test.web.reactive.server.returnResult
-import org.springframework.web.reactive.function.BodyInserters
-import java.io.File
-import java.nio.file.Files
-import java.nio.file.Paths
-import kotlin.test.assertNotNull
-//FIXME("testReceive method is failing in server build, It is not stable, may be timing issue.")
-@Ignore
-@RunWith(SpringRunner::class)
-@EnableAutoConfiguration
-@ContextConfiguration(classes = [MessagingControllerTest::class, SecurityProperties::class])
-@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor", "org.onap.ccsdk.cds.controllerblueprints"])
-@TestPropertySource(locations = ["classpath:application-test.properties"])
-@DirtiesContext
-@EmbeddedKafka(ports = [9092])
-@WebFluxTest
-class MessagingControllerTest {
-
-    private val log = LoggerFactory.getLogger(MessagingControllerTest::class.java)!!
-
-    @Autowired
-    lateinit var controller: MessagingController
-
-    @Value("\${blueprintsprocessor.messageclient.self-service-api.consumerTopic}")
-    lateinit var topicUsedForConsumer: String
-
-    @Autowired
-    lateinit var kt: KafkaTemplate<String, ExecutionServiceInput>
-
-    @Autowired
-    lateinit var webTestClient: WebTestClient
-
-    var event: ExecutionServiceInput? = null
-
-    @Before
-    fun setup() {
-        deleteDir("target", "blueprints")
-        uploadBluePrint()
-    }
-
-    @After
-    fun clean() {
-        deleteDir("target", "blueprints")
-    }
-
-    @Test
-    fun testReceive() {
-        val samplePayload = "{\n" +
-                "    \"resource-assignment-request\": {\n" +
-                "      \"artifact-name\": [\"hostname\"],\n" +
-                "      \"store-result\": true,\n" +
-                "      \"resource-assignment-properties\" : {\n" +
-                "        \"hostname\": \"demo123\"\n" +
-                "      }\n" +
-                "    }\n" +
-                "  }"
-
-        kt.defaultTopic = topicUsedForConsumer
-
-        val input = ExecutionServiceInput().apply {
-            commonHeader = CommonHeader().apply {
-                originatorId = "1"
-                requestId = "1234"
-                subRequestId = "1234-1234"
-            }
-
-            actionIdentifiers = ActionIdentifiers().apply {
-                blueprintName = "golden"
-                blueprintVersion = "1.0.0"
-                actionName = "resource-assignment"
-                mode = "sync"
-            }
-
-            stepData = StepData().apply {
-                name = "resource-assignment"
-            }
-
-            payload = JacksonUtils.jsonNode(samplePayload) as ObjectNode
-        }
-
-        kt.sendDefault(input)
-        log.info("test-sender sent message='{}'", ToStringBuilder.reflectionToString(input))
-
-        Thread.sleep(1000)
-
-        assertNotNull(event)
-    }
-
-    @KafkaListener(topicPartitions = [TopicPartition(topic = "\${blueprintsprocessor.messageclient.self-service-api.topic}", partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "0")])])
-    fun receivedEventFromBluePrintProducer(receivedEvent: ExecutionServiceInput) {
-        event = receivedEvent
-    }
-
-    private fun uploadBluePrint() {
-        runBlocking {
-            val body = MultipartBodyBuilder().apply {
-                part("file", object : ByteArrayResource(Files.readAllBytes(loadCbaArchive().toPath())) {
-                    override fun getFilename(): String {
-                        return "test-cba.zip"
-                    }
-                })
-            }.build()
-
-            webTestClient
-                    .post()
-                    .uri("/api/v1/execution-service/upload")
-                    .body(BodyInserters.fromMultipartData(body))
-                    .exchange()
-                    .expectStatus().isOk
-                    .returnResult<String>()
-                    .responseBody
-                    .awaitSingle()
-        }
-    }
-
-    private fun loadCbaArchive():File {
-        return Paths.get("./src/test/resources/cba-for-kafka-integration_enriched.zip").toFile()
-    }
-
-    @Configuration
-    @EnableKafka
-    open class ConsumerConfiguration {
-
-        @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
-        lateinit var bootstrapServers: String
-
-        @Value("\${blueprintsprocessor.messageclient.self-service-api.groupId}")
-        lateinit var groupId:String
-
-        @Bean
-        open fun consumerFactory2(): ConsumerFactory<String, ExecutionServiceInput>? {
-            val configProperties = hashMapOf<String, Any>()
-            configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
-            configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
-            configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
-            configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java.name
-            configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
-            configProperties[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 1000
-
-            return DefaultKafkaConsumerFactory(configProperties, StringDeserializer(),
-                    JsonDeserializer(ExecutionServiceInput::class.java))
-        }
-
-        @Bean
-        open fun listenerFactory(): ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput> {
-            val factory = ConcurrentKafkaListenerContainerFactory<String, ExecutionServiceInput>()
-            factory.consumerFactory = consumerFactory2()
-            return factory
-        }
-    }
-}
-
-
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/messaginglib/ProducerConfiguration.kt
deleted file mode 100644 (file)
index dc1f38a..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright © 2019 Bell Canada
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.messaginglib
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.StringSerializer
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.annotation.EnableKafka
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.core.ProducerFactory
-import org.springframework.kafka.support.serializer.JsonSerializer
-
-@Configuration
-open class ProducerConfiguration {
-
-    @Value("\${blueprintsprocessor.messageclient.self-service-api.bootstrapServers}")
-    lateinit var bootstrapServers: String
-
-    open fun kpf(): ProducerFactory<String, ExecutionServiceInput> {
-        val configs = HashMap<String, Any>()
-        configs[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
-        configs[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
-        configs[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
-        return DefaultKafkaProducerFactory(configs)
-    }
-
-    @Bean
-    open fun kt(): KafkaTemplate<String, ExecutionServiceInput> {
-        return KafkaTemplate<String, ExecutionServiceInput>(kpf())
-    }
-}
\ No newline at end of file
index ab3bac8..d18b700 100644 (file)
@@ -33,10 +33,15 @@ blueprints.processor.functions.python.executor.executionPath=./../../../../compo
 blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
 
 # Kafka-message-lib Configuration
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=true
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10
+
+blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
@@ -1,35 +1,35 @@
-<!--\r
-  ~ Copyright © 2017-2018 AT&T Intellectual Property.\r
-  ~\r
-  ~ Licensed under the Apache License, Version 2.0 (the "License");\r
-  ~ you may not use this file except in compliance with the License.\r
-  ~ You may obtain a copy of the License at\r
-  ~\r
-  ~     http://www.apache.org/licenses/LICENSE-2.0\r
-  ~\r
-  ~ Unless required by applicable law or agreed to in writing, software\r
-  ~ distributed under the License is distributed on an "AS IS" BASIS,\r
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-  ~ See the License for the specific language governing permissions and\r
-  ~ limitations under the License.\r
-  -->\r
-\r
-<configuration>\r
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">\r
-        <!-- encoders are assigned the type\r
-             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->\r
-        <encoder>\r
-            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>\r
-        </encoder>\r
-    </appender>\r
-\r
-\r
-    <logger name="org.springframework" level="warn"/>\r
-    <logger name="org.hibernate" level="info"/>\r
-    <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>\r
-\r
-    <root level="warn">\r
-        <appender-ref ref="STDOUT"/>\r
-    </root>\r
-\r
-</configuration>\r
+<!--
+  ~ Copyright © 2017-2018 AT&T Intellectual Property.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{50} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+
+    <logger name="org.springframework" level="warn"/>
+    <logger name="org.hibernate" level="info"/>
+    <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+    <root level="warn">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
index 08bc6c3..b74b7e4 100644 (file)
@@ -54,6 +54,10 @@ fun String.asJsonPrimitive(): TextNode {
     return TextNode(this)
 }
 
+inline fun <reified T : Any> String.jsonAsType(): T {
+    return JacksonUtils.readValue<T>(this.trim())
+}
+
 // If you know the string is json content, then use the function directly
 fun String.jsonAsJsonType(): JsonNode {
     return JacksonUtils.jsonNode(this.trim())