Blueprints Processor Metrics
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
index 9524e37..89a9637 100644 (file)
@@ -18,6 +18,8 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
 import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
@@ -27,6 +29,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInpu
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
@@ -44,7 +49,9 @@ class ExecutionServiceHandler(
     private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
     private val blueprintsProcessorCatalogService: BluePrintCatalogService,
     private val bluePrintWorkflowExecutionService:
-    BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
+        BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+    private val publishAuditService: PublishAuditService,
+    private val meterRegistry: MeterRegistry
 ) {
 
     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
@@ -67,33 +74,47 @@ class ExecutionServiceHandler(
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()
             }
-            else -> responseObserver.onNext(
-                response(
+            else -> {
+                publishAuditService.publishExecutionInput(executionServiceInput)
+                val executionServiceOutput = response(
                     executionServiceInput,
                     "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
                     true
-                ).toProto()
-            )
+                )
+                meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+                publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+                responseObserver.onNext(
+                    executionServiceOutput.toProto()
+                )
+            }
         }
     }
 
     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
-        log.info("processing request id $requestId")
         val actionIdentifiers = executionServiceInput.actionIdentifiers
         val blueprintName = actionIdentifiers.blueprintName
         val blueprintVersion = actionIdentifiers.blueprintVersion
+
+        lateinit var executionServiceOutput: ExecutionServiceOutput
+
+        log.info("processing request id $requestId")
+
+        // Audit input
+        publishAuditService.publishExecutionInput(executionServiceInput)
+
+        val sample = Timer.start()
         try {
             /** Check Blueprint is needed for this request */
             if (checkServiceFunction(executionServiceInput)) {
-                return executeServiceFunction(executionServiceInput)
+                executionServiceOutput = executeServiceFunction(executionServiceInput)
             } else {
                 val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
                 log.info("blueprint base path $basePath")
 
                 val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
 
-                val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+                executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
                     blueprintRuntimeService,
                     executionServiceInput, hashMapOf()
                 )
@@ -101,14 +122,20 @@ class ExecutionServiceHandler(
                 val errors = blueprintRuntimeService.getBluePrintError().errors
                 if (errors.isNotEmpty()) {
                     val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
-                    setErrorStatus(errorMessage, output.status)
+                    setErrorStatus(errorMessage, executionServiceOutput.status)
                 }
-                return output
             }
         } catch (e: Exception) {
             log.error("fail processing request id $requestId", e)
-            return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+            executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
+        // Update process metrics
+        sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+        meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+        // Audit output
+        publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+        return executionServiceOutput
     }
 
     /** If the blueprint name is default, It means no blueprint is needed for the execution */