Make use of Kafka Key for Audit service and Kafka listener
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / KafkaPublishAuditService.kt
index 9ac11c7..93885bf 100644 (file)
@@ -24,10 +24,10 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.PropertyAssignmentService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils
@@ -69,10 +69,18 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the input to its output.
      * Sensitive data within the request are hidden.
      */
-    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+    override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
-        this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
-        this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+        val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
+        try {
+            this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+            this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
+        } catch (e: Exception) {
+            var errMsg =
+                    if (e.message != null) "ERROR : ${e.message}"
+                    else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -80,10 +88,18 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the output to its input.
      * A correlation UUID is added to link the input to its output.
      */
-    override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+    override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
         executionServiceOutput.correlationUUID = correlationUUID
-        this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
-        this.outputInstance!!.sendMessage(executionServiceOutput)
+        val key = executionServiceOutput.actionIdentifiers.blueprintName
+        try {
+            this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+            this.outputInstance!!.sendMessage(key, executionServiceOutput)
+        } catch (e: Exception) {
+            var errMsg =
+                if (e.message != null) "ERROR : $e"
+                else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -100,15 +116,8 @@ class KafkaPublishAuditService(
      * Create a kafka producer instance.
      */
     private fun createInstance(selector: String): BlueprintMessageProducerService {
-        log.info(
-                "Setting up message producer($selector)..."
-        )
-        return try {
-            bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService(selector)
-        } catch (e: Exception) {
-            throw BluePrintProcessorException("failed to create producer service ${e.message}")
-        }
+        log.info("Setting up message producer($selector)...")
+        return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector)
     }
 
     /**
@@ -133,56 +142,73 @@ class KafkaPublishAuditService(
 
         if (blueprintName == "default") return clonedExecutionServiceInput
 
-        if (clonedExecutionServiceInput.payload
-                        .path("$workflowName-request").has("$workflowName-properties")) {
-
-            /** Retrieving sensitive input parameters */
-            val requestId = clonedExecutionServiceInput.commonHeader.requestId
-            val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
-
-            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-
-            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
-            val blueprintContext = blueprintRuntimeService.bluePrintContext()
-
-            /** Looking for node templates defined as component-resource-resolution */
-            val nodeTemplates = blueprintContext.nodeTemplates()
-            nodeTemplates!!.forEach { nodeTemplate ->
-                val nodeTemplateName = nodeTemplate.key
-                val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
-                if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
-                    val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
-                    val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
-                    val propertyAssignments: MutableMap<String, JsonNode> =
-                            blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
-                                    ?: hashMapOf()
-
-                    val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
-                    val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java)
-
-                    /** Storing mapping entries with metadata log-protect set to true */
-                    val sensitiveParameters: List<String> = artifactPrefixNames
-                            .map { "$it-mapping" }
-                            .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
-                            .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
-                            .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
-                            .map { it.name }
-
-                    /** Hiding sensitive input parameters from the request */
-                    var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
-                            .path("$workflowName-request")
-                            .path("$workflowName-properties") as ObjectNode
-
-                    sensitiveParameters.forEach { sensitiveParameter ->
-                        if (workflowProperties.has(sensitiveParameter)) {
-                            workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+        try {
+            if (clonedExecutionServiceInput.payload
+                            .path("$workflowName-request").has("$workflowName-properties")) {
+
+                /** Retrieving sensitive input parameters */
+                val requestId = clonedExecutionServiceInput.commonHeader.requestId
+                val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+                val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+                /** Looking for node templates defined as component-resource-resolution */
+                val nodeTemplates = blueprintContext.nodeTemplates()
+                nodeTemplates!!.forEach { nodeTemplate ->
+                    val nodeTemplateName = nodeTemplate.key
+                    val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+                    if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+                        val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+                        val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+                        val propertyAssignments: MutableMap<String, JsonNode> =
+                                blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+                                        ?: hashMapOf()
+
+                        /** Getting values define in artifact-prefix-names */
+                        val input = executionServiceInput.payload.get("$workflowName-request")
+                        blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+                        val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+                        val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+                        val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+                                BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+                                nodeTemplateName,
+                                ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+                                artifactPrefixNamesNode!!)
+
+                        val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+                        /** Storing mapping entries with metadata log-protect set to true */
+                        val sensitiveParameters: List<String> = artifactPrefixNames
+                                .map { "$it-mapping" }
+                                .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+                                .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+                                .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+                                .map { it.name }
+
+                        /** Hiding sensitive input parameters from the request */
+                        var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+                                .path("$workflowName-request")
+                                .path("$workflowName-properties") as ObjectNode
+
+                        sensitiveParameters.forEach { sensitiveParameter ->
+                            if (workflowProperties.has(sensitiveParameter)) {
+                                workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+                            }
                         }
                     }
                 }
             }
+        } catch (e: Exception) {
+        val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+        log.error(errMsg, e)
+        clonedExecutionServiceInput.payload.replace(
+                "$workflowName-request",
+                "$errMsg $e".asJsonPrimitive())
         }
-
         return clonedExecutionServiceInput
     }
 }