package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.stream.Collectors
@Service
-class ExecutionServiceHandler(private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
- private val blueprintsProcessorCatalogService: BluePrintCatalogService,
- private val bluePrintWorkflowExecutionService
- : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>) {
+class ExecutionServiceHandler(
+ private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
+ private val blueprintsProcessorCatalogService: BluePrintCatalogService,
+ private val bluePrintWorkflowExecutionService:
+ BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+ private val publishAuditService: PublishAuditService,
+ private val meterRegistry: MeterRegistry
+) {
private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
- suspend fun process(executionServiceInput: ExecutionServiceInput,
- responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>) {
+ suspend fun process(
+ executionServiceInput: ExecutionServiceInput,
+ responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
+ ) {
when {
executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
GlobalScope.launch(Dispatchers.Default) {
responseObserver.onNext(executionServiceOutput.toProto())
responseObserver.onCompleted()
}
- else -> responseObserver.onNext(response(executionServiceInput,
+ else -> {
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ val executionServiceOutput = response(
+ executionServiceInput,
"Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
- true).toProto());
+ true
+ )
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+ responseObserver.onNext(
+ executionServiceOutput.toProto()
+ )
+ }
}
}
suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
val requestId = executionServiceInput.commonHeader.requestId
- log.info("processing request id $requestId")
val actionIdentifiers = executionServiceInput.actionIdentifiers
val blueprintName = actionIdentifiers.blueprintName
val blueprintVersion = actionIdentifiers.blueprintVersion
+
+ lateinit var executionServiceOutput: ExecutionServiceOutput
+
+ log.info("processing request id $requestId")
+
+ // Audit input
+ publishAuditService.publishExecutionInput(executionServiceInput)
+
+ val sample = Timer.start()
try {
- val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
- log.info("blueprint base path $basePath")
+ /** Check Blueprint is needed for this request */
+ if (checkServiceFunction(executionServiceInput)) {
+ executionServiceOutput = executeServiceFunction(executionServiceInput)
+ } else {
+ val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
+ log.info("blueprint base path $basePath")
- val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
+ val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
- val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(blueprintRuntimeService,
- executionServiceInput, hashMapOf())
+ executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+ blueprintRuntimeService,
+ executionServiceInput, hashMapOf()
+ )
- val errors = blueprintRuntimeService.getBluePrintError().errors
- if (errors.isNotEmpty()) {
- val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
- setErrorStatus(errorMessage, output.status)
+ val errors = blueprintRuntimeService.getBluePrintError().errors
+ if (errors.isNotEmpty()) {
+ val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
+ setErrorStatus(errorMessage, executionServiceOutput.status)
+ }
}
- return output
} catch (e: Exception) {
log.error("fail processing request id $requestId", e)
- return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+ executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
+ // Update process metrics
+ sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+ // Audit output
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+ return executionServiceOutput
+ }
+
+ /** If the blueprint name is default, It means no blueprint is needed for the execution */
+ fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
+ return executionServiceInput.actionIdentifiers.blueprintName == "default"
+ }
+
+ /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
+ suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+ val actionName = executionServiceInput.actionIdentifiers.actionName
+ val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
+ checkNotNull(instance) { "failed to initialize service function($actionName)" }
+ instance.actionName = actionName
+ return instance.applyNB(executionServiceInput)
}
private fun setErrorStatus(errorMessage: String, status: Status) {
status.message = BluePrintConstants.STATUS_FAILURE
}
- private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
- failure: Boolean = false): ExecutionServiceOutput {
+ private fun response(
+ executionServiceInput: ExecutionServiceInput,
+ errorMessage: String = "",
+ failure: Boolean = false
+ ): ExecutionServiceOutput {
val executionServiceOutput = ExecutionServiceOutput()
executionServiceOutput.commonHeader = executionServiceInput.commonHeader
executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
return executionServiceOutput
}
-
}