package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import io.grpc.stub.StreamObserver
+import io.micrometer.core.instrument.MeterRegistry
+import io.micrometer.core.instrument.Timer
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
+import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
private val blueprintsProcessorCatalogService: BluePrintCatalogService,
private val bluePrintWorkflowExecutionService:
- BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
- private val publishAuditService: PublishAuditService
+ BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
+ private val publishAuditService: PublishAuditService,
+ private val meterRegistry: MeterRegistry
) {
private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
responseObserver.onCompleted()
}
else -> {
- publishAuditService.publish(executionServiceInput)
+ publishAuditService.publishExecutionInput(executionServiceInput)
val executionServiceOutput = response(
- executionServiceInput,
- "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
- true
+ executionServiceInput,
+ "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+ true
)
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
responseObserver.onNext(
- executionServiceOutput.toProto()
+ executionServiceOutput.toProto()
)
}
}
log.info("processing request id $requestId")
- try {
- publishAuditService.publish(executionServiceInput)
+ // Audit input
+ publishAuditService.publishExecutionInput(executionServiceInput)
+ val sample = Timer.start()
+ try {
/** Check Blueprint is needed for this request */
if (checkServiceFunction(executionServiceInput)) {
executionServiceOutput = executeServiceFunction(executionServiceInput)
log.error("fail processing request id $requestId", e)
executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
}
+ // Update process metrics
+ sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
+ meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
- publishAuditService.publish(executionServiceInput.correlationUUID, executionServiceOutput)
+ // Audit output
+ publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
return executionServiceOutput
}