X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ms%2Fblueprintsprocessor%2Fmodules%2Finbounds%2Fselfservice-api%2Fsrc%2Fmain%2Fkotlin%2Forg%2Fonap%2Fccsdk%2Fcds%2Fblueprintsprocessor%2Fselfservice%2Fapi%2FExecutionServiceHandler.kt;h=89a9637279712809b46fa5527701e429a1991851;hb=refs%2Fchanges%2F59%2F114359%2F1;hp=a81d35eac4add8709ad397a8e027a6f065694fb4;hpb=dd710215139505f26f052a7dbdcdb5bf7f2e0532;p=ccsdk%2Fcds.git diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index a81d35eac..89a963727 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -18,68 +18,48 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import io.grpc.stub.StreamObserver +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch -import kotlinx.coroutines.reactive.awaitSingle -import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.* -import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status +import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS +import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType -import org.onap.ccsdk.cds.controllerblueprints.core.* -import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintPathConfiguration -import org.onap.ccsdk.cds.controllerblueprints.core.data.ErrorCode +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants +import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService -import org.onap.ccsdk.cds.controllerblueprints.core.scripts.BluePrintCompileCache -import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintFileUtils +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils import org.slf4j.LoggerFactory -import org.springframework.http.codec.multipart.FilePart import org.springframework.stereotype.Service -import java.io.File -import java.io.IOException -import java.util.* import java.util.stream.Collectors @Service -class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintPathConfiguration, - private val blueprintsProcessorCatalogService: BluePrintCatalogService, - private val bluePrintWorkflowExecutionService - : BluePrintWorkflowExecutionService) { +class ExecutionServiceHandler( + private val bluePrintLoadConfiguration: BluePrintLoadConfiguration, + private val blueprintsProcessorCatalogService: BluePrintCatalogService, + private val bluePrintWorkflowExecutionService: + BluePrintWorkflowExecutionService, + private val publishAuditService: PublishAuditService, + private val meterRegistry: MeterRegistry +) { private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString()) - suspend fun upload(filePart: FilePart): String { - val saveId = UUID.randomUUID().toString() - val blueprintArchive = normalizedPathName(bluePrintPathConfiguration.blueprintArchivePath, saveId) - val blueprintWorking = normalizedPathName(bluePrintPathConfiguration.blueprintWorkingPath, saveId) - try { - - val compressedFile = normalizedFile(blueprintArchive, "cba.zip") - compressedFile.parentFile.reCreateNBDirs() - // Copy the File Part to Local File - copyFromFilePart(filePart, compressedFile) - // Save the Copied file to Database - return blueprintsProcessorCatalogService.saveToDatabase(saveId, compressedFile, true) - } catch (e: IOException) { - throw BluePrintException(ErrorCode.IO_FILE_INTERRUPT.value, - "Error in Upload CBA: ${e.message}", e) - } finally { - // Clean blueprint script cache - val cacheKey = BluePrintFileUtils - .compileCacheKey(normalizedPathName(bluePrintPathConfiguration.blueprintWorkingPath,saveId)) - BluePrintCompileCache.cleanClassLoader(cacheKey) - deleteNBDir(blueprintArchive) - deleteNBDir(blueprintWorking) - } - } - - suspend fun remove(name: String, version: String) { - blueprintsProcessorCatalogService.deleteFromDatabase(name, version) - } - - suspend fun process(executionServiceInput: ExecutionServiceInput, - responseObserver: StreamObserver) { + suspend fun process( + executionServiceInput: ExecutionServiceInput, + responseObserver: StreamObserver + ) { when { executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> { GlobalScope.launch(Dispatchers.Default) { @@ -94,43 +74,82 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() } - else -> responseObserver.onNext(response(executionServiceInput, - "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", - true).toProto()); + else -> { + publishAuditService.publishExecutionInput(executionServiceInput) + val executionServiceOutput = response( + executionServiceInput, + "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", + true + ) + meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment() + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) + responseObserver.onNext( + executionServiceOutput.toProto() + ) + } } } suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { val requestId = executionServiceInput.commonHeader.requestId - log.info("processing request id $requestId") val actionIdentifiers = executionServiceInput.actionIdentifiers val blueprintName = actionIdentifiers.blueprintName val blueprintVersion = actionIdentifiers.blueprintVersion - try { - val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) - log.info("blueprint base path $basePath") - val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + lateinit var executionServiceOutput: ExecutionServiceOutput - val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService, - executionServiceInput, hashMapOf()) + log.info("processing request id $requestId") + + // Audit input + publishAuditService.publishExecutionInput(executionServiceInput) - val errors = blueprintRuntimeService.getBluePrintError().errors - if (errors.isNotEmpty()) { - val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) - setErrorStatus(errorMessage, output.status) + val sample = Timer.start() + try { + /** Check Blueprint is needed for this request */ + if (checkServiceFunction(executionServiceInput)) { + executionServiceOutput = executeServiceFunction(executionServiceInput) + } else { + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + log.info("blueprint base path $basePath") + + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + + executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow( + blueprintRuntimeService, + executionServiceInput, hashMapOf() + ) + + val errors = blueprintRuntimeService.getBluePrintError().errors + if (errors.isNotEmpty()) { + val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) + setErrorStatus(errorMessage, executionServiceOutput.status) + } } - return output } catch (e: Exception) { log.error("fail processing request id $requestId", e) - return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) + executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } + // Update process metrics + sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput))) + meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment() + + // Audit output + publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput) + return executionServiceOutput + } + + /** If the blueprint name is default, It means no blueprint is needed for the execution */ + fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean { + return executionServiceInput.actionIdentifiers.blueprintName == "default" } - private suspend fun copyFromFilePart(filePart: FilePart, targetFile: File): File { - return filePart.transferTo(targetFile) - .thenReturn(targetFile) - .awaitSingle() + /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */ + suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { + val actionName = executionServiceInput.actionIdentifiers.actionName + val instance = BluePrintDependencyService.instance(actionName) + checkNotNull(instance) { "failed to initialize service function($actionName)" } + instance.actionName = actionName + return instance.applyNB(executionServiceInput) } private fun setErrorStatus(errorMessage: String, status: Status) { @@ -140,8 +159,11 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP status.message = BluePrintConstants.STATUS_FAILURE } - private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "", - failure: Boolean = false): ExecutionServiceOutput { + private fun response( + executionServiceInput: ExecutionServiceInput, + errorMessage: String = "", + failure: Boolean = false + ): ExecutionServiceOutput { val executionServiceOutput = ExecutionServiceOutput() executionServiceOutput.commonHeader = executionServiceInput.commonHeader executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers @@ -160,5 +182,4 @@ class ExecutionServiceHandler(private val bluePrintPathConfiguration: BluePrintP return executionServiceOutput } - }