Blueprints Processor Metrics
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
index a81d35e..89a9637 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
 import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
-import kotlinx.coroutines.reactive.awaitSingle
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
-import org.onap.ccsdk.cds.controllerblueprints.core.*
-import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintPathConfiguration
-import org.onap.ccsdk.cds.controllerblueprints.core.data.ErrorCode
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.slf4j.LoggerFactory
-import org.springframework.http.codec.multipart.FilePart
 import org.springframework.stereotype.Service
-import java.io.File
-import java.io.IOException
-import java.util.*
 import java.util.stream.Collectors
 
 @Service
-class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintPathConfiguration,
-                              private val blueprintsProcessorCatalogService: BluePrintCatalogService,
-                              private val bluePrintWorkflowExecutionService
-                              : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>) {
+class ExecutionServiceHandler(
+    private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
+    private val blueprintsProcessorCatalogService: BluePrintCatalogService,
+    private val bluePrintWorkflowExecutionService:
+        BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+    private val publishAuditService: PublishAuditService,
+    private val meterRegistry: MeterRegistry
+) {
 
     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
 
-    suspend fun upload(filePart: FilePart): String {
-        val saveId = UUID.randomUUID().toString()
-        val blueprintArchive = normalizedPathName(bluePrintPathConfiguration.blueprintArchivePath, saveId)
-        val blueprintWorking = normalizedPathName(bluePrintPathConfiguration.blueprintWorkingPath, saveId)
-        try {
-
-            val compressedFile = normalizedFile(blueprintArchive, "cba.zip")
-            compressedFile.parentFile.reCreateNBDirs()
-            // Copy the File Part to Local File
-            copyFromFilePart(filePart, compressedFile)
-            // Save the Copied file to Database
-            return blueprintsProcessorCatalogService.saveToDatabase(saveId, compressedFile, true)
-        } catch (e: IOException) {
-            throw BluePrintException(ErrorCode.IO_FILE_INTERRUPT.value,
-                "Error in Upload CBA: ${e.message}", e)
-        } finally {
-            // Clean blueprint script cache
-            val cacheKey = BluePrintFileUtils
-                    .compileCacheKey(normalizedPathName(bluePrintPathConfiguration.blueprintWorkingPath,saveId))
-            BluePrintCompileCache.cleanClassLoader(cacheKey)
-            deleteNBDir(blueprintArchive)
-            deleteNBDir(blueprintWorking)
-        }
-    }
-
-    suspend fun remove(name: String, version: String) {
-        blueprintsProcessorCatalogService.deleteFromDatabase(name, version)
-    }
-
-    suspend fun process(executionServiceInput: ExecutionServiceInput,
-                        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>) {
+    suspend fun process(
+        executionServiceInput: ExecutionServiceInput,
+        responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
+    ) {
         when {
             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
                 GlobalScope.launch(Dispatchers.Default) {
@@ -94,43 +74,82 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP
                 responseObserver.onNext(executionServiceOutput.toProto())
                 responseObserver.onCompleted()
             }
-            else -> responseObserver.onNext(response(executionServiceInput,
-                "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
-                true).toProto());
+            else -> {
+                publishAuditService.publishExecutionInput(executionServiceInput)
+                val executionServiceOutput = response(
+                    executionServiceInput,
+                    "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+                    true
+                )
+                meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+                publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+                responseObserver.onNext(
+                    executionServiceOutput.toProto()
+                )
+            }
         }
     }
 
     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
-        log.info("processing request id $requestId")
         val actionIdentifiers = executionServiceInput.actionIdentifiers
         val blueprintName = actionIdentifiers.blueprintName
         val blueprintVersion = actionIdentifiers.blueprintVersion
-        try {
-            val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
-            log.info("blueprint base path $basePath")
 
-            val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+        lateinit var executionServiceOutput: ExecutionServiceOutput
 
-            val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService,
-                executionServiceInput, hashMapOf())
+        log.info("processing request id $requestId")
+
+        // Audit input
+        publishAuditService.publishExecutionInput(executionServiceInput)
 
-            val errors = blueprintRuntimeService.getBluePrintError().errors
-            if (errors.isNotEmpty()) {
-                val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
-                setErrorStatus(errorMessage, output.status)
+        val sample = Timer.start()
+        try {
+            /** Check Blueprint is needed for this request */
+            if (checkServiceFunction(executionServiceInput)) {
+                executionServiceOutput = executeServiceFunction(executionServiceInput)
+            } else {
+                val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+                log.info("blueprint base path $basePath")
+
+                val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+
+                executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+                    blueprintRuntimeService,
+                    executionServiceInput, hashMapOf()
+                )
+
+                val errors = blueprintRuntimeService.getBluePrintError().errors
+                if (errors.isNotEmpty()) {
+                    val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
+                    setErrorStatus(errorMessage, executionServiceOutput.status)
+                }
             }
-            return output
         } catch (e: Exception) {
             log.error("fail processing request id $requestId", e)
-            return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+            executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
         }
+        // Update process metrics
+        sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+        meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+        // Audit output
+        publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+        return executionServiceOutput
+    }
+
+    /** If the blueprint name is default, It means no blueprint is needed for the execution */
+    fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
+        return executionServiceInput.actionIdentifiers.blueprintName == "default"
     }
 
-    private suspend fun copyFromFilePart(filePart: FilePart, targetFile: File): File {
-        return filePart.transferTo(targetFile)
-            .thenReturn(targetFile)
-            .awaitSingle()
+    /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
+    suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+        val actionName = executionServiceInput.actionIdentifiers.actionName
+        val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
+        checkNotNull(instance) { "failed to initialize service function($actionName)" }
+        instance.actionName = actionName
+        return instance.applyNB(executionServiceInput)
     }
 
     private fun setErrorStatus(errorMessage: String, status: Status) {
@@ -140,8 +159,11 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP
         status.message = BluePrintConstants.STATUS_FAILURE
     }
 
-    private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
-                         failure: Boolean = false): ExecutionServiceOutput {
+    private fun response(
+        executionServiceInput: ExecutionServiceInput,
+        errorMessage: String = "",
+        failure: Boolean = false
+    ): ExecutionServiceOutput {
         val executionServiceOutput = ExecutionServiceOutput()
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
@@ -160,5 +182,4 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP
 
         return executionServiceOutput
     }
-
 }