Blueprints Processor Metrics
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
index 20af589..89a9637 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
 import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.slf4j.LoggerFactory
 import org.springframework.stereotype.Service
 import java.util.stream.Collectors
 
 @Service
-class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
-                              private val blueprintsProcessorCatalogService: BluePrintCatalogService,
-                              private val bluePrintWorkflowExecutionService
-                              : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>) {
+class ExecutionServiceHandler(
+    private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
+    private val blueprintsProcessorCatalogService: BluePrintCatalogService,
+    private val bluePrintWorkflowExecutionService:
+        BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+    private val publishAuditService: PublishAuditService,
+    private val meterRegistry: MeterRegistry
+) {
 
     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
 
-    suspend fun process(executionServiceInput: ExecutionServiceInput,
-                        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>) {
+    suspend fun process(
+        executionServiceInput: ExecutionServiceInput,
+        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
+    ) {
         when {
             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
                 GlobalScope.launch(Dispatchers.Default) {
@@ -57,37 +74,82 @@ class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintL
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()
             }
-            else -> responseObserver.onNext(response(executionServiceInput,
+            else -> {
+                publishAuditService.publishExecutionInput(executionServiceInput)
+                val executionServiceOutput = response(
+                    executionServiceInput,
                     "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
-                    true).toProto());
+                    true
+                )
+                meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+                publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+                responseObserver.onNext(
+                    executionServiceOutput.toProto()
+                )
+            }
         }
     }
 
     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
-        log.info("processing request id $requestId")
         val actionIdentifiers = executionServiceInput.actionIdentifiers
         val blueprintName = actionIdentifiers.blueprintName
         val blueprintVersion = actionIdentifiers.blueprintVersion
+
+        lateinit var executionServiceOutput: ExecutionServiceOutput
+
+        log.info("processing request id $requestId")
+
+        // Audit input
+        publishAuditService.publishExecutionInput(executionServiceInput)
+
+        val sample = Timer.start()
         try {
-            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-            log.info("blueprint base path $basePath")
+            /** Check Blueprint is needed for this request */
+            if (checkServiceFunction(executionServiceInput)) {
+                executionServiceOutput = executeServiceFunction(executionServiceInput)
+            } else {
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+                log.info("blueprint base path $basePath")
 
-            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
 
-            val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService,
-                    executionServiceInput, hashMapOf())
+                executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+                    blueprintRuntimeService,
+                    executionServiceInput, hashMapOf()
+                )
 
-            val errors = blueprintRuntimeService.getBluePrintError().errors
-            if (errors.isNotEmpty()) {
-                val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
-                setErrorStatus(errorMessage, output.status)
+                val errors = blueprintRuntimeService.getBluePrintError().errors
+                if (errors.isNotEmpty()) {
+                    val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
+                    setErrorStatus(errorMessage, executionServiceOutput.status)
+                }
             }
-            return output
         } catch (e: Exception) {
             log.error("fail processing request id $requestId", e)
-            return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+            executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
+        // Update process metrics
+        sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+        meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+        // Audit output
+        publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+        return executionServiceOutput
+    }
+
+    /** If the blueprint name is default, It means no blueprint is needed for the execution */
+    fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
+        return executionServiceInput.actionIdentifiers.blueprintName == "default"
+    }
+
+    /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
+    suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+        val actionName = executionServiceInput.actionIdentifiers.actionName
+        val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
+        checkNotNull(instance) { "failed to initialize service function($actionName)" }
+        instance.actionName = actionName
+        return instance.applyNB(executionServiceInput)
     }
 
     private fun setErrorStatus(errorMessage: String, status: Status) {
@@ -97,8 +159,11 @@ class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintL
         status.message = BluePrintConstants.STATUS_FAILURE
     }
 
-    private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
-                         failure: Boolean = false): ExecutionServiceOutput {
+    private fun response(
+        executionServiceInput: ExecutionServiceInput,
+        errorMessage: String = "",
+        failure: Boolean = false
+    ): ExecutionServiceOutput {
         val executionServiceOutput = ExecutionServiceOutput()
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
@@ -117,5 +182,4 @@ class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintL
 
         return executionServiceOutput
     }
-
 }