Publish execution input/output into Kafka topics 82/104182/16
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 23 Mar 2020 18:33:40 +0000 (14:33 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Wed, 8 Apr 2020 23:02:14 +0000 (19:02 -0400)
- Creation and implementation of PublishAudit services (NoAudit,Kafka)
- Conceal of sensitive data (request input parameters)
- Update ExecutionServiceHandler tests
- Update application.properties file

Issue-ID: CCSDK-2183
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Ifc28a91ce588f5ed0010acb4ed22b64429c6c1a0

ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/application/src/test/resources/application.properties
ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/api/data/BlueprintProcessorData.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties

index 5beebd8..ad38883 100755 (executable)
@@ -129,13 +129,20 @@ blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
 blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
 blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
 blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
 blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
 
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
 
 # Message prioritization kakfa properties, Enable if Prioritization service is needed
 # Deploy message-prioritization function along with blueprintsprocessor application.
index cb34193..c6e957b 100644 (file)
@@ -74,14 +74,26 @@ blueprintsprocessor.cliExecutor.enabled=true
 blueprintsprocessor.remoteScriptCommand.enabled=false
 
 
-# Kafka-message-lib Configuration
-blueprintsprocessor.messageclient.self-service-api.topic=producer.t
-blueprintsprocessor.messageclient.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageclient.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageclient.self-service-api.consumerTopic=receiver.t
-blueprintsprocessor.messageclient.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageclient.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageclient.self-service-api.kafkaEnable=false
+# Kafka-message-lib Configurations
+blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
 
 
 endpoints.user.name=eHbVUbJAj4AG2522cSbrOQ==
index d949854..58d0f13 100644 (file)
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiModelProperty
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import java.util.Date
+import java.util.UUID
 
 /**
  * BlueprintProcessorData
@@ -33,7 +34,8 @@ import java.util.Date
  */
 
 open class ExecutionServiceInput {
-
+    @get:ApiModelProperty(required = false, hidden = true)
+    var correlationUUID: String = UUID.randomUUID().toString()
     @get:ApiModelProperty(required = true, value = "Headers providing request context.")
     lateinit var commonHeader: CommonHeader
     @get:ApiModelProperty(required = true, value = "Provide information about the action to execute.")
@@ -51,6 +53,8 @@ open class ExecutionServiceInput {
 }
 
 open class ExecutionServiceOutput {
+    @get:ApiModelProperty(required = false, hidden = true)
+    var correlationUUID: String? = null
     @get:ApiModelProperty(required = true, value = "Headers providing request context.")
     lateinit var commonHeader: CommonHeader
     @get:ApiModelProperty(required = true, value = "Provide information about the action to execute.")
index d0b4df8..49f2a50 100644 (file)
@@ -51,15 +51,13 @@ open class BluePrintProcessingKafkaConsumer(
 
     companion object {
         const val CONSUMER_SELECTOR = "self-service-api"
-        const val PRODUCER_SELECTOR = "self-service-api"
     }
 
     @EventListener(ApplicationReadyEvent::class)
     fun setupMessageListener() = runBlocking {
         try {
             log.info(
-                "Setting up message consumer($CONSUMER_SELECTOR) and " +
-                        "message producer($PRODUCER_SELECTOR)..."
+                "Setting up message consumer($CONSUMER_SELECTOR)"
             )
 
             /** Get the Message Consumer Service **/
@@ -74,18 +72,6 @@ open class BluePrintProcessingKafkaConsumer(
                 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
             }
 
-            /** Get the Message Producer Service **/
-            val blueprintMessageProducerService = try {
-                bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService(PRODUCER_SELECTOR)
-            } catch (e: BluePrintProcessorException) {
-                val errorMsg = "Failed creating Kafka producer message service."
-                throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
-                        "Wrong Kafka selector provided or internal error in Kafka service.")
-            } catch (e: Exception) {
-                throw BluePrintProcessorException("failed to create producer service ${e.message}")
-            }
-
             launch {
                 /** Subscribe to the consumer topics */
                 val additionalConfig: MutableMap<String, Any> = hashMapOf()
@@ -96,10 +82,7 @@ open class BluePrintProcessingKafkaConsumer(
                             ph.register()
                             log.trace("Consumed Message : $message")
                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
-                            val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-                            // TODO("In future, Message publisher configuration vary with respect to request")
-                            /** Send the response message */
-                            blueprintMessageProducerService.sendMessage(executionServiceOutput)
+                            executionServiceHandler.doProcess(executionServiceInput)
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         } finally {
@@ -110,8 +93,7 @@ open class BluePrintProcessingKafkaConsumer(
             }
         } catch (e: Exception) {
             log.error(
-                "failed to start message consumer($CONSUMER_SELECTOR) and " +
-                        "message producer($PRODUCER_SELECTOR) ", e
+                "failed to start message consumer($CONSUMER_SELECTOR) ", e
             )
         }
     }
@@ -120,8 +102,7 @@ open class BluePrintProcessingKafkaConsumer(
     fun shutdownMessageListener() = runBlocking {
         try {
             log.info(
-                "Shutting down message consumer($CONSUMER_SELECTOR) and " +
-                        "message producer($PRODUCER_SELECTOR)..."
+                "Shutting down message consumer($CONSUMER_SELECTOR)"
             )
             blueprintMessageConsumerService.shutDown()
             ph.arriveAndAwaitAdvance()
index 9524e37..74c4b00 100644 (file)
@@ -44,7 +44,8 @@ class ExecutionServiceHandler(
     private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
     private val blueprintsProcessorCatalogService: BluePrintCatalogService,
     private val bluePrintWorkflowExecutionService:
-    BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
+    BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+    private val publishAuditService: PublishAuditService
 ) {
 
     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
@@ -67,33 +68,44 @@ class ExecutionServiceHandler(
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()
             }
-            else -> responseObserver.onNext(
-                response(
-                    executionServiceInput,
-                    "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
-                    true
-                ).toProto()
-            )
+            else -> {
+                publishAuditService.publish(executionServiceInput)
+                val executionServiceOutput = response(
+                        executionServiceInput,
+                        "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+                        true
+                )
+                publishAuditService.publish(executionServiceOutput)
+                responseObserver.onNext(
+                        executionServiceOutput.toProto()
+                )
+            }
         }
     }
 
     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
-        log.info("processing request id $requestId")
         val actionIdentifiers = executionServiceInput.actionIdentifiers
         val blueprintName = actionIdentifiers.blueprintName
         val blueprintVersion = actionIdentifiers.blueprintVersion
+
+        lateinit var executionServiceOutput: ExecutionServiceOutput
+
+        log.info("processing request id $requestId")
+
         try {
+            publishAuditService.publish(executionServiceInput)
+
             /** Check Blueprint is needed for this request */
             if (checkServiceFunction(executionServiceInput)) {
-                return executeServiceFunction(executionServiceInput)
+                executionServiceOutput = executeServiceFunction(executionServiceInput)
             } else {
                 val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
                 log.info("blueprint base path $basePath")
 
                 val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
 
-                val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+                executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
                     blueprintRuntimeService,
                     executionServiceInput, hashMapOf()
                 )
@@ -101,14 +113,16 @@ class ExecutionServiceHandler(
                 val errors = blueprintRuntimeService.getBluePrintError().errors
                 if (errors.isNotEmpty()) {
                     val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
-                    setErrorStatus(errorMessage, output.status)
+                    setErrorStatus(errorMessage, executionServiceOutput.status)
                 }
-                return output
             }
         } catch (e: Exception) {
             log.error("fail processing request id $requestId", e)
-            return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+            executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
+
+        publishAuditService.publish(executionServiceOutput)
+        return executionServiceOutput
     }
 
     /** If the blueprint name is default, It means no blueprint is needed for the execution */
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
new file mode 100644 (file)
index 0000000..129e7a5
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils
+import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+import javax.annotation.PostConstruct
+
+/**
+ * Audit service used to produce execution service input and output message
+ * sent into dedicated kafka topics.
+ */
+@ConditionalOnProperty(
+        name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
+        havingValue = "true"
+)
+@Service
+class KafkaPublishAuditService(
+    private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+    private val blueprintsProcessorCatalogService: BluePrintCatalogService
+) : PublishAuditService {
+
+    private var inputInstance: BlueprintMessageProducerService? = null
+    private var outputInstance: BlueprintMessageProducerService? = null
+
+    private lateinit var correlationUUID: String
+
+    private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
+
+    companion object {
+        const val INPUT_SELECTOR = "self-service-api.audit.request"
+        const val OUTPUT_SELECTOR = "self-service-api.audit.response"
+    }
+
+    @PostConstruct
+    private fun init() {
+        log.info("Kakfa audit service is enabled")
+    }
+
+    /**
+     * Publish execution input into a kafka topic.
+     * The correlation UUID is used to link the input to its output.
+     * Sensitive data within the request are hidden.
+     */
+    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+        this.correlationUUID = executionServiceInput.correlationUUID
+        val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+        this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+        this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+    }
+
+    /**
+     * Publish execution output into a kafka topic.
+     * The correlation UUID is used to link the output to its input.
+     * A correlation UUID is added to link the input to its output.
+     */
+    override fun publish(executionServiceOutput: ExecutionServiceOutput) {
+        executionServiceOutput.correlationUUID = this.correlationUUID
+        this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+        this.outputInstance!!.sendMessage(executionServiceOutput)
+    }
+
+    /**
+     * Return the input kafka producer instance using a selector.
+     */
+    private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector)
+
+    /**
+     * Return the output kafka producer instance using a selector.
+     */
+    private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector)
+
+    /**
+     * Create a kafka producer instance.
+     */
+    private fun createInstance(selector: String): BlueprintMessageProducerService {
+        log.info(
+                "Setting up message producer($selector)..."
+        )
+        return try {
+            bluePrintMessageLibPropertyService
+                    .blueprintMessageProducerService(selector)
+        } catch (e: Exception) {
+            throw BluePrintProcessorException("failed to create producer service ${e.message}")
+        }
+    }
+
+    /**
+     * Hide sensitive data in the request.
+     * Sensitive data are declared in the resource resolution mapping using
+     * the property metadata "log-protect" set to true.
+     */
+    private suspend fun hideSensitiveData(
+        executionServiceInput: ExecutionServiceInput
+    ): ExecutionServiceInput {
+
+        var clonedExecutionServiceInput = ExecutionServiceInput().apply {
+            correlationUUID = executionServiceInput.correlationUUID
+            commonHeader = executionServiceInput.commonHeader
+            actionIdentifiers = executionServiceInput.actionIdentifiers
+            payload = executionServiceInput.payload
+        }
+
+        val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName
+        val workflowName = clonedExecutionServiceInput.actionIdentifiers.actionName
+
+        if (blueprintName == "default") return clonedExecutionServiceInput
+
+        if (clonedExecutionServiceInput.payload
+                        .path("$workflowName-request").has("$workflowName-properties")) {
+
+            /** Retrieving sensitive input parameters */
+            val requestId = clonedExecutionServiceInput.commonHeader.requestId
+            val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+            val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+            val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName)
+            val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+            val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+            val propertyAssignments: MutableMap<String, JsonNode> =
+                    blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+                            ?: hashMapOf()
+
+            val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+            val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java)
+
+            /** Storing mapping entries with metadata log-protect set to true */
+            val sensitiveParameters: List<String> = artifactPrefixNames
+                    .map { "$it-mapping" }
+                    .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+                    .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+                    .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+                    .map { it.name }
+
+            /** Hiding sensitive input parameters from the request */
+            var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+                    .path("$workflowName-request")
+                    .path("$workflowName-properties") as ObjectNode
+
+            sensitiveParameters.forEach { sensitiveParameter ->
+                if (workflowProperties.has(sensitiveParameter)) {
+                    workflowProperties.remove(sensitiveParameter)
+                    workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED)
+                }
+            }
+        }
+
+        return clonedExecutionServiceInput
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
new file mode 100644 (file)
index 0000000..3f78200
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+import javax.annotation.PostConstruct
+
+/**
+ * Default audit service when no audit publisher is defined, message aren't sent
+ */
+@ConditionalOnProperty(
+        name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
+        havingValue = "false"
+)
+@Service
+class NoPublishAuditService : PublishAuditService {
+
+    val log = logger(NoPublishAuditService::class)
+
+    @PostConstruct
+    fun init() {
+        log.info("Audit service is disabled")
+    }
+
+    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+    }
+
+    override fun publish(executionServiceOutput: ExecutionServiceOutput) {
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
new file mode 100644 (file)
index 0000000..535a5ea
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright © 2020 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+
+interface PublishAuditService {
+    suspend fun publish(executionServiceInput: ExecutionServiceInput)
+    fun publish(executionServiceOutput: ExecutionServiceOutput)
+}
index b21f968..37f7861 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
+import io.mockk.verify
+import io.mockk.coVerify
+import io.mockk.Runs
+import io.mockk.coEvery
+import io.mockk.just
 import io.mockk.mockk
 import kotlinx.coroutines.runBlocking
 import org.junit.Before
@@ -23,6 +28,7 @@ import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -65,13 +71,52 @@ class ExecutionServiceHandlerTest {
             }
         }
         runBlocking {
-            val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk())
+            val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk(), mockk())
             val isServiceFunction = executionServiceHandler.checkServiceFunction(executionServiceInput)
             assertTrue(isServiceFunction, "failed to checkServiceFunction")
             val executionServiceOutput = executionServiceHandler.executeServiceFunction(executionServiceInput)
             assertNotNull(executionServiceOutput, "failed to get executionServiceOutput")
         }
     }
+
+    @Test
+    fun testPublishAuditFunction() {
+        val executionServiceInput = ExecutionServiceInput().apply {
+            commonHeader = CommonHeader().apply {
+                requestId = "1234"
+                subRequestId = "1234-12"
+                originatorId = "cds-test"
+            }
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "default"
+                blueprintVersion = "1.0.0"
+                actionName = "mock-service-action"
+            }
+        }
+
+        val publishAuditService = mockk<KafkaPublishAuditService>(relaxed = true)
+        val executionServiceHandler = ExecutionServiceHandler(
+                mockk(),
+                mockk(),
+                mockk(),
+                publishAuditService
+        )
+
+        coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs
+
+        var executionServiceOutput: ExecutionServiceOutput? = null
+        runBlocking {
+            executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+        }
+
+        coVerify {
+            publishAuditService.publish(executionServiceInput)
+        }
+
+        verify {
+            publishAuditService.publish(executionServiceOutput!!)
+        }
+    }
 }
 
 @Service("mock-service-action")
index 6003df1..fb2189f 100644 (file)
@@ -37,16 +37,23 @@ error.catalog.errorDefinitionDir=./../../../application/src/test/resources/
 blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints
 blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints
 
-# Kafka-message-lib Configuration
+# Kafka-message-lib Configurations
 blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
 blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
 blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
 blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10
+blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
+blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
+blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+
+# Kafka audit service Configurations
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
 
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t