Blueprints Processor Metrics
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
index f3af254..89a9637 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
-import com.fasterxml.jackson.databind.node.JsonNodeFactory
 import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
-import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintCoreConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.saveCBAFile
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.slf4j.LoggerFactory
-import org.springframework.http.codec.multipart.FilePart
 import org.springframework.stereotype.Service
-import reactor.core.publisher.Mono
 import java.util.stream.Collectors
 
 @Service
-class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration,
-                              private val bluePrintCatalogService: BluePrintCatalogService,
-                              private val bluePrintWorkflowExecutionService
-                              : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>) {
+class ExecutionServiceHandler(
+    private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
+    private val blueprintsProcessorCatalogService: BluePrintCatalogService,
+    private val bluePrintWorkflowExecutionService:
+        BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+    private val publishAuditService: PublishAuditService,
+    private val meterRegistry: MeterRegistry
+) {
 
     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
 
-    fun upload(filePart: FilePart): Mono<String> {
-        try {
-            val archivedPath = BluePrintFileUtils.getCbaStorageDirectory(bluePrintCoreConfiguration.archivePath)
-            val cbaPath = saveCBAFile(filePart, archivedPath)
-            bluePrintCatalogService.saveToDatabase(cbaPath.toFile()).let {
-                return Mono.just("{\"status\": \"Successfully uploaded blueprint with id($it)\"}")
-            }
-        } catch (e: Exception) {
-            return Mono.error<String>(BluePrintException("Error uploading the CBA file.", e))
-        }
-    }
-
-    suspend fun process(executionServiceInput: ExecutionServiceInput,
-                        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>) {
+    suspend fun process(
+        executionServiceInput: ExecutionServiceInput,
+        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
+    ) {
         when {
             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
                 GlobalScope.launch(Dispatchers.Default) {
@@ -79,36 +74,82 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()
             }
-            else -> responseObserver.onNext(response(executionServiceInput,
-                "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
-                true).toProto());
+            else -> {
+                publishAuditService.publishExecutionInput(executionServiceInput)
+                val executionServiceOutput = response(
+                    executionServiceInput,
+                    "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+                    true
+                )
+                meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+                publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+                responseObserver.onNext(
+                    executionServiceOutput.toProto()
+                )
+            }
         }
     }
 
     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
-        log.info("processing request id $requestId")
-
         val actionIdentifiers = executionServiceInput.actionIdentifiers
-
         val blueprintName = actionIdentifiers.blueprintName
         val blueprintVersion = actionIdentifiers.blueprintVersion
 
-        val basePath = bluePrintCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-        log.info("blueprint base path $basePath")
+        lateinit var executionServiceOutput: ExecutionServiceOutput
 
-        val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+        log.info("processing request id $requestId")
 
-        val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService,
-            executionServiceInput, hashMapOf())
+        // Audit input
+        publishAuditService.publishExecutionInput(executionServiceInput)
 
-        val errors = blueprintRuntimeService.getBluePrintError().errors
-        if (errors.isNotEmpty()) {
-            val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
-            setErrorStatus(errorMessage, output.status)
+        val sample = Timer.start()
+        try {
+            /** Check Blueprint is needed for this request */
+            if (checkServiceFunction(executionServiceInput)) {
+                executionServiceOutput = executeServiceFunction(executionServiceInput)
+            } else {
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+                log.info("blueprint base path $basePath")
+
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+
+                executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+                    blueprintRuntimeService,
+                    executionServiceInput, hashMapOf()
+                )
+
+                val errors = blueprintRuntimeService.getBluePrintError().errors
+                if (errors.isNotEmpty()) {
+                    val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
+                    setErrorStatus(errorMessage, executionServiceOutput.status)
+                }
+            }
+        } catch (e: Exception) {
+            log.error("fail processing request id $requestId", e)
+            executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
+        // Update process metrics
+        sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+        meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+        // Audit output
+        publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+        return executionServiceOutput
+    }
 
-        return output
+    /** If the blueprint name is default, It means no blueprint is needed for the execution */
+    fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
+        return executionServiceInput.actionIdentifiers.blueprintName == "default"
+    }
+
+    /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
+    suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+        val actionName = executionServiceInput.actionIdentifiers.actionName
+        val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
+        checkNotNull(instance) { "failed to initialize service function($actionName)" }
+        instance.actionName = actionName
+        return instance.applyNB(executionServiceInput)
     }
 
     private fun setErrorStatus(errorMessage: String, status: Status) {
@@ -118,12 +159,15 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
         status.message = BluePrintConstants.STATUS_FAILURE
     }
 
-    private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
-                         failure: Boolean = false): ExecutionServiceOutput {
+    private fun response(
+        executionServiceInput: ExecutionServiceInput,
+        errorMessage: String = "",
+        failure: Boolean = false
+    ): ExecutionServiceOutput {
         val executionServiceOutput = ExecutionServiceOutput()
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
-        executionServiceOutput.payload = JsonNodeFactory.instance.objectNode()
+        executionServiceOutput.payload = executionServiceInput.payload
 
         val status = Status()
         if (failure) {
@@ -138,5 +182,4 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
 
         return executionServiceOutput
     }
-
-}
\ No newline at end of file
+}