Refactoring to enable on_failure for imperative workflow
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / workflow-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / workflow / ImperativeWorkflowExecutionService.kt
index 2aa4085..29019b7 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
 
 import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.coroutineScope
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
 import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
 import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
-import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.isAcyclic
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBlueprintWorkFlowService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintRuntimeService
 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
 import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
 import org.springframework.stereotype.Service
+import kotlin.coroutines.CoroutineContext
 
 @Service("imperativeWorkflowExecutionService")
 class ImperativeWorkflowExecutionService(
     private val nodeTemplateExecutionService: NodeTemplateExecutionService
 ) :
-    BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+    BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
 
-    override suspend fun executeBluePrintWorkflow(
-        bluePrintRuntimeService: BluePrintRuntimeService<*>,
+    override suspend fun executeBlueprintWorkflow(
+        bluePrintRuntimeService: BlueprintRuntimeService<*>,
         executionServiceInput: ExecutionServiceInput,
         properties: MutableMap<String, Any>
     ): ExecutionServiceOutput {
@@ -54,23 +59,34 @@ class ImperativeWorkflowExecutionService(
 
         val graph = bluePrintContext.workflowByName(workflowName).asGraph()
 
-        return ImperativeBluePrintWorkflowService(nodeTemplateExecutionService)
-                .executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput)
+        if (!graph.isAcyclic()) {
+            throw BlueprintException("Imperative workflow must be acyclic. Check on_success/on_failure for circular references")
+        }
+
+        return coroutineScope {
+            ImperativeBlueprintWorkflowService(
+                nodeTemplateExecutionService,
+                this.coroutineContext[MDCContext]
+            )
+        }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
     }
 }
 
-open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService) :
-    AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
+open class ImperativeBlueprintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
+    AbstractBlueprintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
 
-    val log = logger(ImperativeBluePrintWorkflowService::class)
+    final override val coroutineContext: CoroutineContext
+        get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
 
-    lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
+    val log = logger(ImperativeBlueprintWorkflowService::class)
+
+    lateinit var bluePrintRuntimeService: BlueprintRuntimeService<*>
     lateinit var executionServiceInput: ExecutionServiceInput
     lateinit var workflowName: String
 
     override suspend fun executeWorkflow(
         graph: Graph,
-        bluePrintRuntimeService: BluePrintRuntimeService<*>,
+        bluePrintRuntimeService: BlueprintRuntimeService<*>,
         input: ExecutionServiceInput
     ): ExecutionServiceOutput {
         this.graph = graph
@@ -84,7 +100,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
         if (!workflowActor.isClosedForSend) {
             workflowActor.send(startMessage)
         } else {
-            throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
+            throw BlueprintProcessorException("workflow($workflowActor) actor is closed")
         }
         return output.await()
     }
@@ -98,12 +114,12 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
             if (exceptions.isNotEmpty()) {
                 exceptions.forEach {
                     val errorMessage = it.message ?: ""
-                    bluePrintRuntimeService.getBluePrintError().addError(errorMessage)
+                    bluePrintRuntimeService.getBlueprintError().addError(errorMessage, "workflow")
                     log.error("workflow($workflowId) exception :", it)
                 }
-                message = BluePrintConstants.STATUS_FAILURE
+                message = BlueprintConstants.STATUS_FAILURE
             } else {
-                message = BluePrintConstants.STATUS_SUCCESS
+                message = BlueprintConstants.STATUS_SUCCESS
             }
             eventType = EventType.EVENT_COMPONENT_EXECUTED.name
         }
@@ -115,22 +131,22 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
     }
 
     override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
-            NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val nodeOutput = ExecutionServiceOutput().apply {
-            commonHeader = executionServiceInput.commonHeader
-            actionIdentifiers = executionServiceInput.actionIdentifiers
+        NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+            val nodeOutput = ExecutionServiceOutput().apply {
+                commonHeader = executionServiceInput.commonHeader
+                actionIdentifiers = executionServiceInput.actionIdentifiers
+            }
+            return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
         }
-        return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
-    }
 
     override suspend fun prepareNodeSkipMessage(node: Graph.Node):
-            NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val nodeOutput = ExecutionServiceOutput().apply {
-            commonHeader = executionServiceInput.commonHeader
-            actionIdentifiers = executionServiceInput.actionIdentifiers
+        NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+            val nodeOutput = ExecutionServiceOutput().apply {
+                commonHeader = executionServiceInput.commonHeader
+                actionIdentifiers = executionServiceInput.actionIdentifiers
+            }
+            return NodeSkipMessage(node, executionServiceInput, nodeOutput)
         }
-        return NodeSkipMessage(node, executionServiceInput, nodeOutput)
-    }
 
     override suspend fun executeNode(
         node: Graph.Node,
@@ -141,14 +157,18 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
         val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
         checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
         val nodeTemplateName = step.target!!
+
         /** execute node template */
         val executionServiceOutput = nodeTemplateExecutionService
-            .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+            .executeNodeTemplate(bluePrintRuntimeService, node.id, nodeTemplateName, nodeInput)
 
-        return when (executionServiceOutput.status.message) {
-            BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
-            else -> EdgeLabel.SUCCESS
+        if (executionServiceOutput.status.message == BlueprintConstants.STATUS_FAILURE) {
+            // Clear step errors so that the workflow does not fail
+            bluePrintRuntimeService.getBlueprintError().stepErrors(node.id)?.clear()
+            return EdgeLabel.FAILURE
         }
+
+        return EdgeLabel.SUCCESS
     }
 
     override suspend fun skipNode(