package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.coroutineScope
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
-import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.isAcyclic
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBlueprintWorkFlowService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintRuntimeService
import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
import org.springframework.stereotype.Service
+import kotlin.coroutines.CoroutineContext
@Service("imperativeWorkflowExecutionService")
class ImperativeWorkflowExecutionService(
private val nodeTemplateExecutionService: NodeTemplateExecutionService
) :
- BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+ BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
- override suspend fun executeBluePrintWorkflow(
- bluePrintRuntimeService: BluePrintRuntimeService<*>,
+ override suspend fun executeBlueprintWorkflow(
+ bluePrintRuntimeService: BlueprintRuntimeService<*>,
executionServiceInput: ExecutionServiceInput,
properties: MutableMap<String, Any>
): ExecutionServiceOutput {
val graph = bluePrintContext.workflowByName(workflowName).asGraph()
- return ImperativeBluePrintWorkflowService(nodeTemplateExecutionService)
- .executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput)
+ if (!graph.isAcyclic()) {
+ throw BlueprintException("Imperative workflow must be acyclic. Check on_success/on_failure for circular references")
+ }
+
+ return coroutineScope {
+ ImperativeBlueprintWorkflowService(
+ nodeTemplateExecutionService,
+ this.coroutineContext[MDCContext]
+ )
+ }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
}
}
-open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService) :
- AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
+open class ImperativeBlueprintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
+ AbstractBlueprintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
- val log = logger(ImperativeBluePrintWorkflowService::class)
+ final override val coroutineContext: CoroutineContext
+ get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
- lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
+ val log = logger(ImperativeBlueprintWorkflowService::class)
+
+ lateinit var bluePrintRuntimeService: BlueprintRuntimeService<*>
lateinit var executionServiceInput: ExecutionServiceInput
lateinit var workflowName: String
override suspend fun executeWorkflow(
graph: Graph,
- bluePrintRuntimeService: BluePrintRuntimeService<*>,
+ bluePrintRuntimeService: BlueprintRuntimeService<*>,
input: ExecutionServiceInput
): ExecutionServiceOutput {
this.graph = graph
if (!workflowActor.isClosedForSend) {
workflowActor.send(startMessage)
} else {
- throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
+ throw BlueprintProcessorException("workflow($workflowActor) actor is closed")
}
return output.await()
}
if (exceptions.isNotEmpty()) {
exceptions.forEach {
val errorMessage = it.message ?: ""
- bluePrintRuntimeService.getBluePrintError().addError(errorMessage)
+ bluePrintRuntimeService.getBlueprintError().addError(errorMessage, "workflow")
log.error("workflow($workflowId) exception :", it)
}
- message = BluePrintConstants.STATUS_FAILURE
+ message = BlueprintConstants.STATUS_FAILURE
} else {
- message = BluePrintConstants.STATUS_SUCCESS
+ message = BlueprintConstants.STATUS_SUCCESS
}
eventType = EventType.EVENT_COMPONENT_EXECUTED.name
}
}
override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
- NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val nodeOutput = ExecutionServiceOutput().apply {
- commonHeader = executionServiceInput.commonHeader
- actionIdentifiers = executionServiceInput.actionIdentifiers
+ NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
}
- return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
- }
override suspend fun prepareNodeSkipMessage(node: Graph.Node):
- NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
- val nodeOutput = ExecutionServiceOutput().apply {
- commonHeader = executionServiceInput.commonHeader
- actionIdentifiers = executionServiceInput.actionIdentifiers
+ NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+ val nodeOutput = ExecutionServiceOutput().apply {
+ commonHeader = executionServiceInput.commonHeader
+ actionIdentifiers = executionServiceInput.actionIdentifiers
+ }
+ return NodeSkipMessage(node, executionServiceInput, nodeOutput)
}
- return NodeSkipMessage(node, executionServiceInput, nodeOutput)
- }
override suspend fun executeNode(
node: Graph.Node,
val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
val nodeTemplateName = step.target!!
+
/** execute node template */
val executionServiceOutput = nodeTemplateExecutionService
- .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+ .executeNodeTemplate(bluePrintRuntimeService, node.id, nodeTemplateName, nodeInput)
- return when (executionServiceOutput.status.message) {
- BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
- else -> EdgeLabel.SUCCESS
+ if (executionServiceOutput.status.message == BlueprintConstants.STATUS_FAILURE) {
+ // Clear step errors so that the workflow does not fail
+ bluePrintRuntimeService.getBlueprintError().stepErrors(node.id)?.clear()
+ return EdgeLabel.FAILURE
}
+
+ return EdgeLabel.SUCCESS
}
override suspend fun skipNode(