package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.coroutineScope
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.controllerblueprints.core.*
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
+import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
-import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.service.*
-import org.springframework.beans.factory.config.ConfigurableBeanFactory
-import org.springframework.context.annotation.Scope
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBlueprintWorkFlowService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintRuntimeService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
+import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
+import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
import org.springframework.stereotype.Service
+import kotlin.coroutines.CoroutineContext
@Service("imperativeWorkflowExecutionService")
class ImperativeWorkflowExecutionService(
- private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
- : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+ private val nodeTemplateExecutionService: NodeTemplateExecutionService
+) :
+ BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
- override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
- executionServiceInput: ExecutionServiceInput,
- properties: MutableMap<String, Any>): ExecutionServiceOutput {
+ override suspend fun executeBlueprintWorkflow(
+ bluePrintRuntimeService: BlueprintRuntimeService<*>,
+ executionServiceInput: ExecutionServiceInput,
+ properties: MutableMap<String, Any>
+ ): ExecutionServiceOutput {
val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
val graph = bluePrintContext.workflowByName(workflowName).asGraph()
- val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
- imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
- executionServiceInput, deferredOutput)
- return deferredOutput.await()
+ return coroutineScope {
+ ImperativeBlueprintWorkflowService(
+ nodeTemplateExecutionService,
+ this.coroutineContext[MDCContext]
+ )
+ }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
}
}
-@Service
-@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService)
- : AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
- val log = logger(ImperativeBluePrintWorkflowService::class)
+open class ImperativeBlueprintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
+ AbstractBlueprintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
- lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
+ final override val coroutineContext: CoroutineContext
+ get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
+
+ val log = logger(ImperativeBlueprintWorkflowService::class)
+
+ lateinit var bluePrintRuntimeService: BlueprintRuntimeService<*>
lateinit var executionServiceInput: ExecutionServiceInput
lateinit var workflowName: String
- lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput>
- override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
- input: ExecutionServiceInput,
- output: CompletableDeferred<ExecutionServiceOutput>) {
+ override suspend fun executeWorkflow(
+ graph: Graph,
+ bluePrintRuntimeService: BlueprintRuntimeService<*>,
+ input: ExecutionServiceInput
+ ): ExecutionServiceOutput {
this.graph = graph
this.bluePrintRuntimeService = bluePrintRuntimeService
this.executionServiceInput = input
this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
- this.deferredExecutionServiceOutput = output
this.workflowId = bluePrintRuntimeService.id()
+ val output = CompletableDeferred<ExecutionServiceOutput>()
val startMessage = WorkflowExecuteMessage(input, output)
- workflowActor().send(startMessage)
+ val workflowActor = workflowActor()
+ if (!workflowActor.isClosedForSend) {
+ workflowActor.send(startMessage)
+ } else {
+ throw BlueprintProcessorException("workflow($workflowActor) actor is closed")
+ }
+ return output.await()
}
override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
return EdgeLabel.SUCCESS
}
- override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput {
- val wfStatus = if (exception != null) {
- val status = Status()
- status.message = BluePrintConstants.STATUS_FAILURE
- status.errorMessage = exception.message
- status
- } else {
- val status = Status()
- status.message = BluePrintConstants.STATUS_SUCCESS
- status
+ override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
+ val wfStatus = Status().apply {
+ if (exceptions.isNotEmpty()) {
+ exceptions.forEach {
+ val errorMessage = it.message ?: ""
+ bluePrintRuntimeService.getBlueprintError().addError(errorMessage)
+ log.error("workflow($workflowId) exception :", it)
+ }
+ message = BlueprintConstants.STATUS_FAILURE
+ } else {
+ message = BlueprintConstants.STATUS_SUCCESS
+ }
+ eventType = EventType.EVENT_COMPONENT_EXECUTED.name
}
return ExecutionServiceOutput().apply {
commonHeader = executionServiceInput.commonHeader
}
}
- override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
- : NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val nodeOutput = ExecutionServiceOutput().apply {
- commonHeader = executionServiceInput.commonHeader
- actionIdentifiers = executionServiceInput.actionIdentifiers
+ override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
+ NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
}
- return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
- }
- override suspend fun prepareNodeSkipMessage(node: Graph.Node)
- : NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val nodeOutput = ExecutionServiceOutput().apply {
- commonHeader = executionServiceInput.commonHeader
- actionIdentifiers = executionServiceInput.actionIdentifiers
+ override suspend fun prepareNodeSkipMessage(node: Graph.Node):
+ NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeSkipMessage(node, executionServiceInput, nodeOutput)
}
- return NodeSkipMessage(node, executionServiceInput, nodeOutput)
- }
- override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- nodeOutput: ExecutionServiceOutput): EdgeLabel {
- log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})")
+ override suspend fun executeNode(
+ node: Graph.Node,
+ nodeInput: ExecutionServiceInput,
+ nodeOutput: ExecutionServiceOutput
+ ): EdgeLabel {
+ log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})")
val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
val nodeTemplateName = step.target!!
+
/** execute node template */
val executionServiceOutput = nodeTemplateExecutionService
- .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+ .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
return when (executionServiceOutput.status.message) {
- BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
+ BlueprintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
else -> EdgeLabel.SUCCESS
}
}
- override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- nodeOutput: ExecutionServiceOutput): EdgeLabel {
+ override suspend fun skipNode(
+ node: Graph.Node,
+ nodeInput: ExecutionServiceInput,
+ nodeOutput: ExecutionServiceOutput
+ ): EdgeLabel {
return EdgeLabel.SUCCESS
}
- override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- nodeOutput: ExecutionServiceOutput): EdgeLabel {
+ override suspend fun cancelNode(
+ node: Graph.Node,
+ nodeInput: ExecutionServiceInput,
+ nodeOutput: ExecutionServiceOutput
+ ): EdgeLabel {
TODO("not implemented")
}
- override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
- nodeOutput: ExecutionServiceOutput): EdgeLabel {
+ override suspend fun restartNode(
+ node: Graph.Node,
+ nodeInput: ExecutionServiceInput,
+ nodeOutput: ExecutionServiceOutput
+ ): EdgeLabel {
TODO("not implemented")
}
-}
\ No newline at end of file
+}