Make use of Kafka Key for Audit service and Kafka listener
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / KafkaPublishAuditService.kt
index 1c5d47c..93885bf 100644 (file)
@@ -23,10 +23,11 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.PropertyAssignmentService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils
@@ -51,7 +52,6 @@ class KafkaPublishAuditService(
 ) : PublishAuditService {
     private var inputInstance: BlueprintMessageProducerService? = null
     private var outputInstance: BlueprintMessageProducerService? = null
-    private lateinit var correlationUUID: String
     private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
 
     companion object {
@@ -69,11 +69,18 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the input to its output.
      * Sensitive data within the request are hidden.
      */
-    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
-        this.correlationUUID = executionServiceInput.correlationUUID
+    override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
-        this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
-        this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+        val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
+        try {
+            this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+            this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
+        } catch (e: Exception) {
+            var errMsg =
+                    if (e.message != null) "ERROR : ${e.message}"
+                    else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -81,10 +88,18 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the output to its input.
      * A correlation UUID is added to link the input to its output.
      */
-    override fun publish(executionServiceOutput: ExecutionServiceOutput) {
-        executionServiceOutput.correlationUUID = this.correlationUUID
-        this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
-        this.outputInstance!!.sendMessage(executionServiceOutput)
+    override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+        executionServiceOutput.correlationUUID = correlationUUID
+        val key = executionServiceOutput.actionIdentifiers.blueprintName
+        try {
+            this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+            this.outputInstance!!.sendMessage(key, executionServiceOutput)
+        } catch (e: Exception) {
+            var errMsg =
+                if (e.message != null) "ERROR : $e"
+                else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -101,15 +116,8 @@ class KafkaPublishAuditService(
      * Create a kafka producer instance.
      */
     private fun createInstance(selector: String): BlueprintMessageProducerService {
-        log.info(
-                "Setting up message producer($selector)..."
-        )
-        return try {
-            bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService(selector)
-        } catch (e: Exception) {
-            throw BluePrintProcessorException("failed to create producer service ${e.message}")
-        }
+        log.info("Setting up message producer($selector)...")
+        return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector)
     }
 
     /**
@@ -134,49 +142,73 @@ class KafkaPublishAuditService(
 
         if (blueprintName == "default") return clonedExecutionServiceInput
 
-        if (clonedExecutionServiceInput.payload
-                        .path("$workflowName-request").has("$workflowName-properties")) {
-
-            /** Retrieving sensitive input parameters */
-            val requestId = clonedExecutionServiceInput.commonHeader.requestId
-            val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
-
-            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-
-            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
-            val blueprintContext = blueprintRuntimeService.bluePrintContext()
-
-            val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName)
-            val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
-            val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
-            val propertyAssignments: MutableMap<String, JsonNode> =
-                    blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
-                            ?: hashMapOf()
-
-            val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
-            val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java)
-
-            /** Storing mapping entries with metadata log-protect set to true */
-            val sensitiveParameters: List<String> = artifactPrefixNames
-                    .map { "$it-mapping" }
-                    .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
-                    .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
-                    .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
-                    .map { it.name }
-
-            /** Hiding sensitive input parameters from the request */
-            var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
-                    .path("$workflowName-request")
-                    .path("$workflowName-properties") as ObjectNode
-
-            sensitiveParameters.forEach { sensitiveParameter ->
-                if (workflowProperties.has(sensitiveParameter)) {
-                    workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+        try {
+            if (clonedExecutionServiceInput.payload
+                            .path("$workflowName-request").has("$workflowName-properties")) {
+
+                /** Retrieving sensitive input parameters */
+                val requestId = clonedExecutionServiceInput.commonHeader.requestId
+                val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+                val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+                /** Looking for node templates defined as component-resource-resolution */
+                val nodeTemplates = blueprintContext.nodeTemplates()
+                nodeTemplates!!.forEach { nodeTemplate ->
+                    val nodeTemplateName = nodeTemplate.key
+                    val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+                    if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+                        val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+                        val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+                        val propertyAssignments: MutableMap<String, JsonNode> =
+                                blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+                                        ?: hashMapOf()
+
+                        /** Getting values define in artifact-prefix-names */
+                        val input = executionServiceInput.payload.get("$workflowName-request")
+                        blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+                        val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+                        val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+                        val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+                                BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+                                nodeTemplateName,
+                                ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+                                artifactPrefixNamesNode!!)
+
+                        val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+                        /** Storing mapping entries with metadata log-protect set to true */
+                        val sensitiveParameters: List<String> = artifactPrefixNames
+                                .map { "$it-mapping" }
+                                .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+                                .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+                                .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+                                .map { it.name }
+
+                        /** Hiding sensitive input parameters from the request */
+                        var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+                                .path("$workflowName-request")
+                                .path("$workflowName-properties") as ObjectNode
+
+                        sensitiveParameters.forEach { sensitiveParameter ->
+                            if (workflowProperties.has(sensitiveParameter)) {
+                                workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+                            }
+                        }
+                    }
                 }
             }
+        } catch (e: Exception) {
+        val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+        log.error(errMsg, e)
+        clonedExecutionServiceInput.payload.replace(
+                "$workflowName-request",
+                "$errMsg $e".asJsonPrimitive())
         }
-
         return clonedExecutionServiceInput
     }
 }