package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
private val blueprintsProcessorCatalogService: BluePrintCatalogService,
private val bluePrintWorkflowExecutionService:
- BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
+ BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+ private val publishAuditService: PublishAuditService,
+ private val meterRegistry: MeterRegistry
) {
private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
responseObserver.onNext(executionServiceOutput.toProto())
responseObserver.onCompleted()
}
- else -> responseObserver.onNext(
- response(
+ else -> {
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ val executionServiceOutput = response(
executionServiceInput,
"Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
true
- ).toProto()
- )
+ )
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+ responseObserver.onNext(
+ executionServiceOutput.toProto()
+ )
+ }
}
}
suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
val requestId = executionServiceInput.commonHeader.requestId
- log.info("processing request id $requestId")
val actionIdentifiers = executionServiceInput.actionIdentifiers
val blueprintName = actionIdentifiers.blueprintName
val blueprintVersion = actionIdentifiers.blueprintVersion
+
+ lateinit var executionServiceOutput: ExecutionServiceOutput
+
+ log.info("processing request id $requestId")
+
+ // Audit input
+ publishAuditService.publishExecutionInput(executionServiceInput)
+
+ val sample = Timer.start()
try {
/** Check Blueprint is needed for this request */
if (checkServiceFunction(executionServiceInput)) {
- return executeServiceFunction(executionServiceInput)
+ executionServiceOutput = executeServiceFunction(executionServiceInput)
} else {
val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
log.info("blueprint base path $basePath")
val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
- val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
+ executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
blueprintRuntimeService,
executionServiceInput, hashMapOf()
)
val errors = blueprintRuntimeService.getBluePrintError().errors
if (errors.isNotEmpty()) {
val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
- setErrorStatus(errorMessage, output.status)
+ setErrorStatus(errorMessage, executionServiceOutput.status)
}
- return output
}
} catch (e: Exception) {
log.error("fail processing request id $requestId", e)
- return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
+ executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
+ // Update process metrics
+ sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+
+ // Audit output
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
+ return executionServiceOutput
}
/** If the blueprint name is default, It means no blueprint is needed for the execution */