Refactoring to enable on_failure for imperative workflow
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / workflow-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / workflow / ImperativeWorkflowExecutionService.kt
index 6bee17f..29019b7 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
 
 import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.coroutineScope
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.controllerblueprints.core.*
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
+import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
-import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.service.*
-import org.springframework.beans.factory.config.ConfigurableBeanFactory
-import org.springframework.context.annotation.Scope
+import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
+import org.onap.ccsdk.cds.controllerblueprints.core.isAcyclic
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBlueprintWorkFlowService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintRuntimeService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
+import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
+import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
 import org.springframework.stereotype.Service
+import kotlin.coroutines.CoroutineContext
 
 @Service("imperativeWorkflowExecutionService")
 class ImperativeWorkflowExecutionService(
-        private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
-    : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+    private val nodeTemplateExecutionService: NodeTemplateExecutionService
+) :
+    BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
 
-    override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
-                                                  executionServiceInput: ExecutionServiceInput,
-                                                  properties: MutableMap<String, Any>): ExecutionServiceOutput {
+    override suspend fun executeBlueprintWorkflow(
+        bluePrintRuntimeService: BlueprintRuntimeService<*>,
+        executionServiceInput: ExecutionServiceInput,
+        properties: MutableMap<String, Any>
+    ): ExecutionServiceOutput {
 
         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
 
@@ -44,23 +59,36 @@ class ImperativeWorkflowExecutionService(
 
         val graph = bluePrintContext.workflowByName(workflowName).asGraph()
 
-        return imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
-                executionServiceInput)
+        if (!graph.isAcyclic()) {
+            throw BlueprintException("Imperative workflow must be acyclic. Check on_success/on_failure for circular references")
+        }
+
+        return coroutineScope {
+            ImperativeBlueprintWorkflowService(
+                nodeTemplateExecutionService,
+                this.coroutineContext[MDCContext]
+            )
+        }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
     }
 }
 
-@Service
-@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService)
-    : AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
-    val log = logger(ImperativeBluePrintWorkflowService::class)
+open class ImperativeBlueprintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
+    AbstractBlueprintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
+
+    final override val coroutineContext: CoroutineContext
+        get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
+
+    val log = logger(ImperativeBlueprintWorkflowService::class)
 
-    lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
+    lateinit var bluePrintRuntimeService: BlueprintRuntimeService<*>
     lateinit var executionServiceInput: ExecutionServiceInput
     lateinit var workflowName: String
 
-    override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
-                                         input: ExecutionServiceInput): ExecutionServiceOutput {
+    override suspend fun executeWorkflow(
+        graph: Graph,
+        bluePrintRuntimeService: BlueprintRuntimeService<*>,
+        input: ExecutionServiceInput
+    ): ExecutionServiceOutput {
         this.graph = graph
         this.bluePrintRuntimeService = bluePrintRuntimeService
         this.executionServiceInput = input
@@ -72,7 +100,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
         if (!workflowActor.isClosedForSend) {
             workflowActor.send(startMessage)
         } else {
-            throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
+            throw BlueprintProcessorException("workflow($workflowActor) actor is closed")
         }
         return output.await()
     }
@@ -86,13 +114,14 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
             if (exceptions.isNotEmpty()) {
                 exceptions.forEach {
                     val errorMessage = it.message ?: ""
-                    bluePrintRuntimeService.getBluePrintError().addError(errorMessage)
+                    bluePrintRuntimeService.getBlueprintError().addError(errorMessage, "workflow")
                     log.error("workflow($workflowId) exception :", it)
                 }
-                message = BluePrintConstants.STATUS_FAILURE
+                message = BlueprintConstants.STATUS_FAILURE
             } else {
-                message = BluePrintConstants.STATUS_SUCCESS
+                message = BlueprintConstants.STATUS_SUCCESS
             }
+            eventType = EventType.EVENT_COMPONENT_EXECUTED.name
         }
         return ExecutionServiceOutput().apply {
             commonHeader = executionServiceInput.commonHeader
@@ -101,52 +130,68 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
         }
     }
 
-    override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
-            : NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val nodeOutput = ExecutionServiceOutput().apply {
-            commonHeader = executionServiceInput.commonHeader
-            actionIdentifiers = executionServiceInput.actionIdentifiers
+    override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
+        NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+            val nodeOutput = ExecutionServiceOutput().apply {
+                commonHeader = executionServiceInput.commonHeader
+                actionIdentifiers = executionServiceInput.actionIdentifiers
+            }
+            return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
         }
-        return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
-    }
 
-    override suspend fun prepareNodeSkipMessage(node: Graph.Node)
-            : NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val nodeOutput = ExecutionServiceOutput().apply {
-            commonHeader = executionServiceInput.commonHeader
-            actionIdentifiers = executionServiceInput.actionIdentifiers
+    override suspend fun prepareNodeSkipMessage(node: Graph.Node):
+        NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
+            val nodeOutput = ExecutionServiceOutput().apply {
+                commonHeader = executionServiceInput.commonHeader
+                actionIdentifiers = executionServiceInput.actionIdentifiers
+            }
+            return NodeSkipMessage(node, executionServiceInput, nodeOutput)
         }
-        return NodeSkipMessage(node, executionServiceInput, nodeOutput)
-    }
 
-    override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                     nodeOutput: ExecutionServiceOutput): EdgeLabel {
-        log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})")
+    override suspend fun executeNode(
+        node: Graph.Node,
+        nodeInput: ExecutionServiceInput,
+        nodeOutput: ExecutionServiceOutput
+    ): EdgeLabel {
+        log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})")
         val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
         checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
         val nodeTemplateName = step.target!!
+
         /** execute node template */
         val executionServiceOutput = nodeTemplateExecutionService
-                .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+            .executeNodeTemplate(bluePrintRuntimeService, node.id, nodeTemplateName, nodeInput)
 
-        return when (executionServiceOutput.status.message) {
-            BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
-            else -> EdgeLabel.SUCCESS
+        if (executionServiceOutput.status.message == BlueprintConstants.STATUS_FAILURE) {
+            // Clear step errors so that the workflow does not fail
+            bluePrintRuntimeService.getBlueprintError().stepErrors(node.id)?.clear()
+            return EdgeLabel.FAILURE
         }
+
+        return EdgeLabel.SUCCESS
     }
 
-    override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                  nodeOutput: ExecutionServiceOutput): EdgeLabel {
+    override suspend fun skipNode(
+        node: Graph.Node,
+        nodeInput: ExecutionServiceInput,
+        nodeOutput: ExecutionServiceOutput
+    ): EdgeLabel {
         return EdgeLabel.SUCCESS
     }
 
-    override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                    nodeOutput: ExecutionServiceOutput): EdgeLabel {
+    override suspend fun cancelNode(
+        node: Graph.Node,
+        nodeInput: ExecutionServiceInput,
+        nodeOutput: ExecutionServiceOutput
+    ): EdgeLabel {
         TODO("not implemented")
     }
 
-    override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                     nodeOutput: ExecutionServiceOutput): EdgeLabel {
+    override suspend fun restartNode(
+        node: Graph.Node,
+        nodeInput: ExecutionServiceInput,
+        nodeOutput: ExecutionServiceOutput
+    ): EdgeLabel {
         TODO("not implemented")
     }
-}
\ No newline at end of file
+}