Kafka Audit Service : CorrelationUUID from request is not matching the correct respon... 91/107491/1
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 11 May 2020 18:38:01 +0000 (14:38 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Mon, 11 May 2020 18:43:56 +0000 (14:43 -0400)
Moved out CorrelationUUID linking between the request and the response from the Kafka Audit Service to the ExecutionServiceHandler.
This prevents the race condition happening when several ExecutionServiceOutputs try to set the CorrelationUUID related to their ExecutionServiceInput.

Issue-ID: CCSDK-2370
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I0c5934d4486961fbfcb34fd2d2492cd843350025

ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt

index 74c4b00..e9d0b7b 100644 (file)
@@ -75,7 +75,7 @@ class ExecutionServiceHandler(
                         "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
                         true
                 )
-                publishAuditService.publish(executionServiceOutput)
+                publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
                 responseObserver.onNext(
                         executionServiceOutput.toProto()
                 )
@@ -121,7 +121,7 @@ class ExecutionServiceHandler(
             executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
 
-        publishAuditService.publish(executionServiceOutput)
+        publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
         return executionServiceOutput
     }
 
index 1c5d47c..9f406f7 100644 (file)
@@ -51,7 +51,6 @@ class KafkaPublishAuditService(
 ) : PublishAuditService {
     private var inputInstance: BlueprintMessageProducerService? = null
     private var outputInstance: BlueprintMessageProducerService? = null
-    private lateinit var correlationUUID: String
     private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
 
     companion object {
@@ -70,7 +69,6 @@ class KafkaPublishAuditService(
      * Sensitive data within the request are hidden.
      */
     override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
-        this.correlationUUID = executionServiceInput.correlationUUID
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
         this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
         this.inputInstance!!.sendMessage(secureExecutionServiceInput)
@@ -81,8 +79,8 @@ class KafkaPublishAuditService(
      * The correlation UUID is used to link the output to its input.
      * A correlation UUID is added to link the input to its output.
      */
-    override fun publish(executionServiceOutput: ExecutionServiceOutput) {
-        executionServiceOutput.correlationUUID = this.correlationUUID
+    override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
+        executionServiceOutput.correlationUUID = correlationUUID
         this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
         this.outputInstance!!.sendMessage(executionServiceOutput)
     }
index 3f78200..eb66e41 100644 (file)
@@ -42,6 +42,6 @@ class NoPublishAuditService : PublishAuditService {
     override suspend fun publish(executionServiceInput: ExecutionServiceInput) {
     }
 
-    override fun publish(executionServiceOutput: ExecutionServiceOutput) {
+    override fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
     }
 }
index 535a5ea..72f4931 100644 (file)
@@ -21,5 +21,5 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp
 
 interface PublishAuditService {
     suspend fun publish(executionServiceInput: ExecutionServiceInput)
-    fun publish(executionServiceOutput: ExecutionServiceOutput)
+    fun publish(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput)
 }