Merge "Kafka Audit Service : CorrelationUUID from request is not matching the correct...
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / KafkaPublishAuditService.kt
1 /*
2  * Copyright © 2020 Bell Canada
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.fasterxml.jackson.databind.node.ObjectNode
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
26 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
27 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
28 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
29 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
30 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
31 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
32 import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils
33 import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment
34 import org.slf4j.LoggerFactory
35 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
36 import org.springframework.stereotype.Service
37 import javax.annotation.PostConstruct
38
39 /**
40  * Audit service used to produce execution service input and output message
41  * sent into dedicated kafka topics.
42  */
43 @ConditionalOnProperty(
44         name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
45         havingValue = "true"
46 )
47 @Service
48 class KafkaPublishAuditService(
49     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
50     private val blueprintsProcessorCatalogService: BluePrintCatalogService
51 ) : PublishAuditService {
52     private var inputInstance: BlueprintMessageProducerService? = null
53     private var outputInstance: BlueprintMessageProducerService? = null
54     private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
55
56     companion object {
57         const val INPUT_SELECTOR = "self-service-api.audit.request"
58         const val OUTPUT_SELECTOR = "self-service-api.audit.response"
59     }
60
61     @PostConstruct
62     private fun init() {
63         log.info("Kakfa audit service is enabled")
64     }
65
66     /**
67      * Publish execution input into a kafka topic.
68      * The correlation UUID is used to link the input to its output.
69      * Sensitive data within the request are hidden.
70      */
71     override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
72         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
73         this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
74         this.inputInstance!!.sendMessage(secureExecutionServiceInput)
75     }
76
77     /**
78      * Publish execution output into a kafka topic.
79      * The correlation UUID is used to link the output to its input.
80      * A correlation UUID is added to link the input to its output.
81      */
82     override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
83         executionServiceOutput.correlationUUID = correlationUUID
84         this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
85         this.outputInstance!!.sendMessage(executionServiceOutput)
86     }
87
88     /**
89      * Return the input kafka producer instance using a selector.
90      */
91     private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector)
92
93     /**
94      * Return the output kafka producer instance using a selector.
95      */
96     private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector)
97
98     /**
99      * Create a kafka producer instance.
100      */
101     private fun createInstance(selector: String): BlueprintMessageProducerService {
102         log.info(
103                 "Setting up message producer($selector)..."
104         )
105         return try {
106             bluePrintMessageLibPropertyService
107                     .blueprintMessageProducerService(selector)
108         } catch (e: Exception) {
109             throw BluePrintProcessorException("failed to create producer service ${e.message}")
110         }
111     }
112
113     /**
114      * Hide sensitive data in the request.
115      * Sensitive data are declared in the resource resolution mapping using
116      * the property metadata "log-protect" set to true.
117      */
118     private suspend fun hideSensitiveData(
119         executionServiceInput: ExecutionServiceInput
120     ): ExecutionServiceInput {
121
122         var clonedExecutionServiceInput = ExecutionServiceInput().apply {
123             correlationUUID = executionServiceInput.correlationUUID
124             commonHeader = executionServiceInput.commonHeader
125             actionIdentifiers = executionServiceInput.actionIdentifiers
126             payload = executionServiceInput.payload.deepCopy()
127             stepData = executionServiceInput.stepData
128         }
129
130         val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName
131         val workflowName = clonedExecutionServiceInput.actionIdentifiers.actionName
132
133         if (blueprintName == "default") return clonedExecutionServiceInput
134
135         if (clonedExecutionServiceInput.payload
136                         .path("$workflowName-request").has("$workflowName-properties")) {
137
138             /** Retrieving sensitive input parameters */
139             val requestId = clonedExecutionServiceInput.commonHeader.requestId
140             val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
141
142             val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
143
144             val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
145             val blueprintContext = blueprintRuntimeService.bluePrintContext()
146
147             val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName)
148             val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
149             val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
150
151             val propertyAssignments: MutableMap<String, JsonNode> =
152                     blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
153                             ?: hashMapOf()
154
155             val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
156             val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java)
157
158             /** Storing mapping entries with metadata log-protect set to true */
159             val sensitiveParameters: List<String> = artifactPrefixNames
160                     .map { "$it-mapping" }
161                     .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
162                     .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
163                     .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
164                     .map { it.name }
165
166             /** Hiding sensitive input parameters from the request */
167             var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
168                     .path("$workflowName-request")
169                     .path("$workflowName-properties") as ObjectNode
170
171             sensitiveParameters.forEach { sensitiveParameter ->
172                 if (workflowProperties.has(sensitiveParameter)) {
173                     workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
174                 }
175             }
176         }
177
178         return clonedExecutionServiceInput
179     }
180 }