Kafka Audit Service : Improve error handling and miscellaneous refactoring 28/109328/3
authorJulien Fontaine <julien.fontaine@bell.ca>
Thu, 18 Jun 2020 22:16:38 +0000 (18:16 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Mon, 22 Jun 2020 18:46:22 +0000 (14:46 -0400)
When Kafka Audit Service fails it no longer stops Blueprint Processor execution

* Add error handling when trying to hide sensitive data
* Add error handling when trying to send kafka message
* Set timeout for blocking loop when sending messages with kafka producer
-> When broker is not available producer tries to reconnect in a blocking loop
* Refactor Audit Service interface to give more explict name for publish methods
* Modify publishExecutionOutput() to a non-blocking function

Issue-ID: CCSDK-2459
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I809a5f34f81889aa9eed499608348f149984bc38

ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt

index b07d643..d76621c 100644 (file)
@@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties()
 open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
 
     var clientId: String? = null
-    // strongest producing guarantee
-    var acks: String = "all"
-    var retries: Int = 0
-    // ensure we don't push duplicates
-    var enableIdempotence: Boolean = true
+    var acks: String = "all" // strongest producing guarantee
+    var maxBlockMs: Int = 250 // max blocking time in ms to send a message
+    var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
+    var enableIdempotence: Boolean = true // ensure we don't push duplicates
 
     override fun getConfig(): HashMap<String, Any> {
         val configProps = super.getConfig()
         configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
         configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
         configProps[ProducerConfig.ACKS_CONFIG] = acks
+        configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
+        configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
         configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
         if (clientId != null) {
             configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
index c659fdb..e9bc5d8 100644 (file)
@@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
 
     fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks)
 
-    fun retries(retries: Int) = retries(retries.asJsonPrimitive())
+    fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive())
 
-    fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries)
+    fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs)
+
+    fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive())
+
+    fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs)
 
     fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive())
 
index 612a57d..b1af230 100644 (file)
@@ -35,7 +35,8 @@ class MessagePropertiesDSLTest {
                         bootstrapServers("sample-bootstrapServers")
                         clientId("sample-client-id")
                         acks("all")
-                        retries(3)
+                        maxBlockMs(0)
+                        reconnectBackOffMs(60 * 60 * 1000)
                         enableIdempotence(true)
                         topic("sample-topic")
                         truststore("/path/to/truststore.jks")
index da73949..537dab1 100644 (file)
@@ -100,6 +100,8 @@ open class BlueprintMessageProducerServiceTest {
                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
                 ProducerConfig.ACKS_CONFIG to "all",
+                ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+                ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
                 ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
                 ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
                 CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
index e9d0b7b..6c62aae 100644 (file)
@@ -69,13 +69,13 @@ class ExecutionServiceHandler(
                 responseObserver.onCompleted()
             }
             else -> {
-                publishAuditService.publish(executionServiceInput)
+                publishAuditService.publishExecutionInput(executionServiceInput)
                 val executionServiceOutput = response(
                         executionServiceInput,
                         "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
                         true
                 )
-                publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+                publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
                 responseObserver.onNext(
                         executionServiceOutput.toProto()
                 )
@@ -93,9 +93,9 @@ class ExecutionServiceHandler(
 
         log.info("processing request id $requestId")
 
-        try {
-            publishAuditService.publish(executionServiceInput)
+        publishAuditService.publishExecutionInput(executionServiceInput)
 
+        try {
             /** Check Blueprint is needed for this request */
             if (checkServiceFunction(executionServiceInput)) {
                 executionServiceOutput = executeServiceFunction(executionServiceInput)
@@ -121,7 +121,7 @@ class ExecutionServiceHandler(
             executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
 
-        publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+        publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
         return executionServiceOutput
     }
 
index 6ff2179..fca7398 100644 (file)
@@ -24,7 +24,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
@@ -70,10 +69,17 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the input to its output.
      * Sensitive data within the request are hidden.
      */
-    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+    override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
-        this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
-        this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+        try {
+            this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
+            this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+        } catch (e: Exception) {
+            var errMsg =
+                    if (e.message != null) "ERROR : ${e.message}"
+                    else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -81,10 +87,17 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the output to its input.
      * A correlation UUID is added to link the input to its output.
      */
-    override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+    override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
         executionServiceOutput.correlationUUID = correlationUUID
-        this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
-        this.outputInstance!!.sendMessage(executionServiceOutput)
+        try {
+            this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
+            this.outputInstance!!.sendMessage(executionServiceOutput)
+        } catch (e: Exception) {
+            var errMsg =
+                if (e.message != null) "ERROR : $e"
+                else "ERROR : Failed to send execution request to Kafka."
+            log.error(errMsg)
+        }
     }
 
     /**
@@ -101,15 +114,8 @@ class KafkaPublishAuditService(
      * Create a kafka producer instance.
      */
     private fun createInstance(selector: String): BlueprintMessageProducerService {
-        log.info(
-                "Setting up message producer($selector)..."
-        )
-        return try {
-            bluePrintMessageLibPropertyService
-                    .blueprintMessageProducerService(selector)
-        } catch (e: Exception) {
-            throw BluePrintProcessorException("failed to create producer service ${e.message}")
-        }
+        log.info("Setting up message producer($selector)...")
+        return bluePrintMessageLibPropertyService.blueprintMessageProducerService(selector)
     }
 
     /**
@@ -134,66 +140,73 @@ class KafkaPublishAuditService(
 
         if (blueprintName == "default") return clonedExecutionServiceInput
 
-        if (clonedExecutionServiceInput.payload
-                        .path("$workflowName-request").has("$workflowName-properties")) {
-
-            /** Retrieving sensitive input parameters */
-            val requestId = clonedExecutionServiceInput.commonHeader.requestId
-            val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
-
-            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-
-            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
-            val blueprintContext = blueprintRuntimeService.bluePrintContext()
-
-            /** Looking for node templates defined as component-resource-resolution */
-            val nodeTemplates = blueprintContext.nodeTemplates()
-            nodeTemplates!!.forEach { nodeTemplate ->
-                val nodeTemplateName = nodeTemplate.key
-                val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
-                if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
-                    val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
-                    val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
-                    val propertyAssignments: MutableMap<String, JsonNode> =
-                            blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
-                                    ?: hashMapOf()
-
-                    /** Getting values define in artifact-prefix-names */
-                    val input = executionServiceInput.payload.get("$workflowName-request")
-                    blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
-                    val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
-                    val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
-                    val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
-                            BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
-                            nodeTemplateName,
-                            ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
-                            artifactPrefixNamesNode!!)
-
-                    val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
-
-                    /** Storing mapping entries with metadata log-protect set to true */
-                    val sensitiveParameters: List<String> = artifactPrefixNames
-                            .map { "$it-mapping" }
-                            .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
-                            .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
-                            .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
-                            .map { it.name }
-
-                    /** Hiding sensitive input parameters from the request */
-                    var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
-                            .path("$workflowName-request")
-                            .path("$workflowName-properties") as ObjectNode
-
-                    sensitiveParameters.forEach { sensitiveParameter ->
-                        if (workflowProperties.has(sensitiveParameter)) {
-                            workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+        try {
+            if (clonedExecutionServiceInput.payload
+                            .path("$workflowName-request").has("$workflowName-properties")) {
+
+                /** Retrieving sensitive input parameters */
+                val requestId = clonedExecutionServiceInput.commonHeader.requestId
+                val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion
+
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+                val blueprintContext = blueprintRuntimeService.bluePrintContext()
+
+                /** Looking for node templates defined as component-resource-resolution */
+                val nodeTemplates = blueprintContext.nodeTemplates()
+                nodeTemplates!!.forEach { nodeTemplate ->
+                    val nodeTemplateName = nodeTemplate.key
+                    val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
+                    if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+                        val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+                        val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+                        val propertyAssignments: MutableMap<String, JsonNode> =
+                                blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+                                        ?: hashMapOf()
+
+                        /** Getting values define in artifact-prefix-names */
+                        val input = executionServiceInput.payload.get("$workflowName-request")
+                        blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+                        val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+                        val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+                        val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+                                BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+                                nodeTemplateName,
+                                ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+                                artifactPrefixNamesNode!!)
+
+                        val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+                        /** Storing mapping entries with metadata log-protect set to true */
+                        val sensitiveParameters: List<String> = artifactPrefixNames
+                                .map { "$it-mapping" }
+                                .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+                                .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+                                .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+                                .map { it.name }
+
+                        /** Hiding sensitive input parameters from the request */
+                        var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
+                                .path("$workflowName-request")
+                                .path("$workflowName-properties") as ObjectNode
+
+                        sensitiveParameters.forEach { sensitiveParameter ->
+                            if (workflowProperties.has(sensitiveParameter)) {
+                                workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+                            }
                         }
                     }
                 }
             }
+        } catch (e: Exception) {
+        val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+        log.error(errMsg, e)
+        clonedExecutionServiceInput.payload.replace(
+                "$workflowName-request",
+                "$errMsg $e".asJsonPrimitive())
         }
-
         return clonedExecutionServiceInput
     }
 }
index eb66e41..6ad73d8 100644 (file)
@@ -39,9 +39,9 @@ class NoPublishAuditService : PublishAuditService {
         log.info("Audit service is disabled")
     }
 
-    override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
+    override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
     }
 
-    override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+    override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
     }
 }
index 72f4931..67473c8 100644 (file)
@@ -20,6 +20,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 
 interface PublishAuditService {
-    suspend fun publish(executionServiceInput: ExecutionServiceInput)
-    fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
+    suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput)
+    suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
 }
index 1914562..70e1ed0 100644 (file)
@@ -16,7 +16,6 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
-import io.mockk.verify
 import io.mockk.coVerify
 import io.mockk.Runs
 import io.mockk.coEvery
@@ -102,7 +101,7 @@ class ExecutionServiceHandlerTest {
                 publishAuditService
         )
 
-        coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs
+        coEvery { publishAuditService.publishExecutionInput(ExecutionServiceInput()) } just Runs
 
         var executionServiceOutput: ExecutionServiceOutput? = null
         runBlocking {
@@ -110,11 +109,8 @@ class ExecutionServiceHandlerTest {
         }
 
         coVerify {
-            publishAuditService.publish(executionServiceInput)
-        }
-
-        verify {
-            publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput!!)
+            publishAuditService.publishExecutionInput(executionServiceInput)
+            publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput!!)
         }
     }
 }