Modify workflow execution service options. 32/93632/3
authorBrinda Santh <brindasanth@in.ibm.com>
Thu, 15 Aug 2019 16:43:41 +0000 (12:43 -0400)
committerBrinda Santh Muthuramalingam <brindasanth@in.ibm.com>
Fri, 16 Aug 2019 14:28:55 +0000 (14:28 +0000)
Change-Id: I629b30f9ff2b8e84d6ae952946608d9bb3437d4c
Issue-ID: CCSDK-1619
Signed-off-by: Brinda Santh <brindasanth@in.ibm.com>
components/model-catalog/blueprint-model/test-blueprint/baseconfiguration/Definitions/activation-blueprint.json
ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImpl.kt
ms/blueprintsprocessor/modules/services/workflow-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionService.kt
ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/BluePrintWorkflowExecutionServiceImplTest.kt
ms/blueprintsprocessor/modules/services/workflow-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/workflow/ImperativeWorkflowExecutionServiceTest.kt
ms/blueprintsprocessor/modules/services/workflow-service/src/test/resources/execution-input/imperative-test-input.json
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/dsl/BluePrintServiceDSLBuilder.kt
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt

index 112014b..70c1bc3 100644 (file)
             ]
           }
         }
+      },
+      "imperative-test-wf": {
+        "steps": {
+          "activate-step1": {
+            "description": "Activate CLI flow 1",
+            "target": "activate-cli",
+            "activities": [
+              {
+                "call_operation": "ComponentScriptExecutor.process"
+              }
+            ],
+            "on_success": [
+              "activate-step2"
+            ]
+          },
+          "activate-step2": {
+            "description": "Activate CLI flow 2",
+            "target": "activate-cli",
+            "activities": [
+              {
+                "call_operation": "ComponentScriptExecutor.process"
+              }
+            ],
+            "on_success": [
+              "activate-step3"
+            ]
+          },
+          "activate-step3": {
+            "description": "Activate CLI flow 3",
+            "target": "activate-cli",
+            "activities": [
+              {
+                "call_operation": "ComponentScriptExecutor.process"
+              }
+            ]
+          }
+        }
       }
     }
   }
index fcf0558..cde919c 100644 (file)
@@ -29,8 +29,9 @@ import org.springframework.stereotype.Service
 
 @Service("bluePrintWorkflowExecutionService")
 open class BluePrintWorkflowExecutionServiceImpl(
-    private val componentWorkflowExecutionService: ComponentWorkflowExecutionService,
-    private val dgWorkflowExecutionService: DGWorkflowExecutionService
+        private val componentWorkflowExecutionService: ComponentWorkflowExecutionService,
+        private val dgWorkflowExecutionService: DGWorkflowExecutionService,
+        private val imperativeWorkflowExecutionService: ImperativeWorkflowExecutionService
 ) : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
 
     private val log = LoggerFactory.getLogger(BluePrintWorkflowExecutionServiceImpl::class.java)!!
@@ -51,28 +52,37 @@ open class BluePrintWorkflowExecutionServiceImpl(
         val input = executionServiceInput.payload.get("$workflowName-request")
         bluePrintRuntimeService.assignWorkflowInputs(workflowName, input)
 
-        // Get the DG Node Template
-        val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName)
+        val workflow = bluePrintContext.workflowByName(workflowName)
 
-        val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom
+        val steps = workflow.steps ?: throw BluePrintProcessorException("could't get steps for workflow($workflowName)")
 
-        log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)")
-
-        val executionServiceOutput: ExecutionServiceOutput = when {
-            derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> {
-                componentWorkflowExecutionService
-                    .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
-            }
-            derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> {
-                dgWorkflowExecutionService
+        /** If workflow has multiple steps, then it is imperative workflow */
+        val executionServiceOutput: ExecutionServiceOutput = if (steps.size > 1) {
+            imperativeWorkflowExecutionService
                     .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
-            }
-            else -> {
-                throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " +
-                        "to node template($nodeTemplateName) derived from($derivedFrom)")
+        } else {
+            // Get the DG Node Template
+            val nodeTemplateName = bluePrintContext.workflowFirstStepNodeTemplate(workflowName)
+
+            val derivedFrom = bluePrintContext.nodeTemplateNodeType(nodeTemplateName).derivedFrom
+
+            log.info("Executing workflow($workflowName) NodeTemplate($nodeTemplateName), derived from($derivedFrom)")
+            /** Return ExecutionServiceOutput based on DG node or Component Node */
+            when {
+                derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_COMPONENT, true) -> {
+                    componentWorkflowExecutionService
+                            .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
+                }
+                derivedFrom.startsWith(BluePrintConstants.MODEL_TYPE_NODE_WORKFLOW, true) -> {
+                    dgWorkflowExecutionService
+                            .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, properties)
+                }
+                else -> {
+                    throw BluePrintProcessorException("couldn't execute workflow($workflowName) step mapped " +
+                            "to node template($nodeTemplateName) derived from($derivedFrom)")
+                }
             }
         }
-
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
         // Resolve Workflow Outputs
index e7e5fe6..2a14be2 100644 (file)
@@ -19,12 +19,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
 import kotlinx.coroutines.CompletableDeferred
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
-import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.*
 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.*
 import org.springframework.beans.factory.config.ConfigurableBeanFactory
 import org.springframework.context.annotation.Scope
@@ -32,7 +31,7 @@ import org.springframework.stereotype.Service
 
 @Service("imperativeWorkflowExecutionService")
 class ImperativeWorkflowExecutionService(
-        private val bluePrintWorkFlowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
+        private val imperativeBluePrintWorkflowService: BluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>)
     : BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
 
     override suspend fun executeBluePrintWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
@@ -46,7 +45,8 @@ class ImperativeWorkflowExecutionService(
         val graph = bluePrintContext.workflowByName(workflowName).asGraph()
 
         val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
-        bluePrintWorkFlowService.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput, deferredOutput)
+        imperativeBluePrintWorkflowService.executeWorkflow(graph, bluePrintRuntimeService,
+                executionServiceInput, deferredOutput)
         return deferredOutput.await()
     }
 }
@@ -59,6 +59,7 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
 
     lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
     lateinit var executionServiceInput: ExecutionServiceInput
+    lateinit var workflowName: String
     lateinit var deferredExecutionServiceOutput: CompletableDeferred<ExecutionServiceOutput>
 
     override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
@@ -67,77 +68,81 @@ open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionS
         this.graph = graph
         this.bluePrintRuntimeService = bluePrintRuntimeService
         this.executionServiceInput = input
+        this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
         this.deferredExecutionServiceOutput = output
         this.workflowId = bluePrintRuntimeService.id()
         val startMessage = WorkflowExecuteMessage(input, output)
-        workflowActor.send(startMessage)
+        workflowActor().send(startMessage)
     }
 
     override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
         return EdgeLabel.SUCCESS
     }
 
-    override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
+    override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): ExecutionServiceOutput {
+        val wfStatus = if (exception != null) {
+            val status = Status()
+            status.message = BluePrintConstants.STATUS_FAILURE
+            status.errorMessage = exception.message
+            status
+        } else {
+            val status = Status()
+            status.message = BluePrintConstants.STATUS_SUCCESS
+            status
+        }
         return ExecutionServiceOutput().apply {
             commonHeader = executionServiceInput.commonHeader
             actionIdentifiers = executionServiceInput.actionIdentifiers
+            status = wfStatus
         }
     }
 
     override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
             : NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
-        return NodeExecuteMessage(node, executionServiceInput, deferredOutput)
+        val nodeOutput = ExecutionServiceOutput().apply {
+            commonHeader = executionServiceInput.commonHeader
+            actionIdentifiers = executionServiceInput.actionIdentifiers
+        }
+        return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
     }
 
     override suspend fun prepareNodeSkipMessage(node: Graph.Node)
             : NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
-        val deferredOutput = CompletableDeferred<ExecutionServiceOutput>()
-        return NodeSkipMessage(node, executionServiceInput, deferredOutput)
+        val nodeOutput = ExecutionServiceOutput().apply {
+            commonHeader = executionServiceInput.commonHeader
+            actionIdentifiers = executionServiceInput.actionIdentifiers
+        }
+        return NodeSkipMessage(node, executionServiceInput, nodeOutput)
     }
 
     override suspend fun executeNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                     deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
-                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
-        try {
-            val nodeTemplateName = node.id
-            /** execute node template */
-            val executionServiceOutput = nodeTemplateExecutionService
-                    .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
-            val edgeStatus = when (executionServiceOutput.status.message) {
-                BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
-                else -> EdgeLabel.SUCCESS
-            }
-            /** set deferred output and status */
-            deferredNodeOutput.complete(executionServiceOutput)
-            deferredNodeStatus.complete(edgeStatus)
-        } catch (e: Exception) {
-            log.error("failed in executeNode($node)", e)
-            deferredNodeOutput.completeExceptionally(e)
-            deferredNodeStatus.complete(EdgeLabel.FAILURE)
+                                     nodeOutput: ExecutionServiceOutput): EdgeLabel {
+        log.info("Executing workflow($workflowName[${this.workflowId}])'s step($${node.id})")
+        val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
+        checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
+        val nodeTemplateName = step.target!!
+        /** execute node template */
+        val executionServiceOutput = nodeTemplateExecutionService
+                .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
+
+        return when (executionServiceOutput.status.message) {
+            BluePrintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
+            else -> EdgeLabel.SUCCESS
         }
     }
 
     override suspend fun skipNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                  deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
-                                  deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
-        val executionServiceOutput = ExecutionServiceOutput().apply {
-            commonHeader = nodeInput.commonHeader
-            actionIdentifiers = nodeInput.actionIdentifiers
-        }
-        deferredNodeOutput.complete(executionServiceOutput)
-        deferredNodeStatus.complete(EdgeLabel.SUCCESS)
+                                  nodeOutput: ExecutionServiceOutput): EdgeLabel {
+        return EdgeLabel.SUCCESS
     }
 
     override suspend fun cancelNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                    deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
-                                    deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                    nodeOutput: ExecutionServiceOutput): EdgeLabel {
         TODO("not implemented")
     }
 
     override suspend fun restartNode(node: Graph.Node, nodeInput: ExecutionServiceInput,
-                                     deferredNodeOutput: CompletableDeferred<ExecutionServiceOutput>,
-                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                     nodeOutput: ExecutionServiceOutput): EdgeLabel {
         TODO("not implemented")
     }
 }
\ No newline at end of file
index 3c74072..436de1b 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
 
+import io.mockk.every
+import io.mockk.mockkObject
+import io.mockk.unmockkAll
 import kotlinx.coroutines.runBlocking
+import org.junit.After
 import org.junit.Before
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
@@ -29,7 +34,6 @@ import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyS
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.context.ApplicationContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.assertEquals
@@ -41,32 +45,53 @@ import kotlin.test.assertNotNull
 @ContextConfiguration(classes = [WorkflowServiceConfiguration::class])
 class BluePrintWorkflowExecutionServiceImplTest {
 
-    @Autowired
-    lateinit var applicationContext: ApplicationContext
-
     @Autowired
     lateinit var bluePrintWorkflowExecutionService: BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>
 
     @Before
     fun init() {
-        BluePrintDependencyService.inject(applicationContext)
+        mockkObject(BluePrintDependencyService)
+        every { BluePrintDependencyService.applicationContext.getBean(any()) } returns MockComponentFunction()
+    }
+
+    @After
+    fun afterTests() {
+        unmockkAll()
     }
 
     @Test
     fun testBluePrintWorkflowExecutionService() {
         runBlocking {
             val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
-                "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+                    "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
 
             val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input.json",
-                ExecutionServiceInput::class.java)!!
+                    ExecutionServiceInput::class.java)!!
 
             val executionServiceOutput = bluePrintWorkflowExecutionService
-                .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+                    .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
 
             assertNotNull(executionServiceOutput, "failed to get response")
             assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message,
-                "failed to get successful response")
+                    "failed to get successful response")
+        }
+    }
+
+    @Test
+    fun testImperativeBluePrintWorkflowExecutionService() {
+        runBlocking {
+            val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
+                    "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+
+            val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/imperative-test-input.json",
+                    ExecutionServiceInput::class.java)!!
+
+            val executionServiceOutput = bluePrintWorkflowExecutionService
+                    .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+
+            assertNotNull(executionServiceOutput, "failed to get response")
+            assertEquals(BluePrintConstants.STATUS_SUCCESS, executionServiceOutput.status.message,
+                    "failed to get successful response")
         }
     }
 
@@ -75,13 +100,13 @@ class BluePrintWorkflowExecutionServiceImplTest {
         assertFailsWith(exceptionClass = BluePrintProcessorException::class) {
             runBlocking {
                 val bluePrintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime("1234",
-                    "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
+                        "./../../../../../components/model-catalog/blueprint-model/test-blueprint/baseconfiguration")
                 //service input will have a mislabeled input params, we are expecting to get an error when that happens with a useful error message
                 val executionServiceInput = JacksonUtils.readValueFromClassPathFile("execution-input/resource-assignment-input-missing-resource_assignment_request.json",
-                    ExecutionServiceInput::class.java)!!
+                        ExecutionServiceInput::class.java)!!
 
                 val executionServiceOutput = bluePrintWorkflowExecutionService
-                    .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
+                        .executeBluePrintWorkflow(bluePrintRuntimeService, executionServiceInput, hashMapOf())
             }
         }
     }
index 301fc34..becd228 100644 (file)
@@ -27,7 +27,9 @@ import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.nodeTypeCompone
 import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.MockComponentFunction
 import org.onap.ccsdk.cds.blueprintsprocessor.services.workflow.mock.mockNodeTemplateComponentScriptExecutor
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintTypes
+import org.onap.ccsdk.cds.controllerblueprints.core.data.ServiceTemplate
 import org.onap.ccsdk.cds.controllerblueprints.core.dsl.serviceTemplate
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedPathName
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
@@ -37,6 +39,7 @@ import kotlin.test.Test
 import kotlin.test.assertNotNull
 
 class ImperativeWorkflowExecutionServiceTest {
+    val log = logger(ImperativeWorkflowExecutionServiceTest::class)
 
     @Before
     fun init() {
@@ -49,37 +52,40 @@ class ImperativeWorkflowExecutionServiceTest {
         unmockkAll()
     }
 
-    @Test
-    fun testImperativeExecutionService() {
-        runBlocking {
-            val serviceTemplate = serviceTemplate("imperative-test", "1.0.0",
-                    "brindasanth@onap.com", "tosca") {
+    fun mockServiceTemplate(): ServiceTemplate {
+        return serviceTemplate("imperative-test", "1.0.0",
+                "brindasanth@onap.com", "tosca") {
 
-                topologyTemplate {
-                    nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config",
-                            "cba.wt.imperative.test.ResolveConfig"))
-                    nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config",
-                            "cba.wt.imperative.test.ActivateConfig"))
-                    nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback",
-                            "cba.wt.imperative.test.ActivateConfigRollback"))
-                    nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence",
-                            "cba.wt.imperative.test.ActivateLicence"))
+            topologyTemplate {
+                nodeTemplate(mockNodeTemplateComponentScriptExecutor("resolve-config",
+                        "cba.wt.imperative.test.ResolveConfig"))
+                nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config",
+                        "cba.wt.imperative.test.ActivateConfig"))
+                nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-config-rollback",
+                        "cba.wt.imperative.test.ActivateConfigRollback"))
+                nodeTemplate(mockNodeTemplateComponentScriptExecutor("activate-licence",
+                        "cba.wt.imperative.test.ActivateLicence"))
 
-                    workflow("test-wf", "Test Imperative flow") {
-                        step("resolve-config", "resolve-config", "") {
-                            success("activate-config")
-                        }
-                        step("activate-config", "activate-config", "") {
-                            success("activate-licence")
-                            failure("activate-config-rollback")
-                        }
-                        step("activate-config-rollback", "activate-config-rollback", "")
-                        step("activate-licence", "activate-licence", "")
+                workflow("imperative-test-wf", "Test Imperative flow") {
+                    step("resolve-config", "resolve-config", "") {
+                        success("activate-config")
                     }
+                    step("activate-config", "activate-config", "") {
+                        success("activate-licence")
+                        failure("activate-config-rollback")
+                    }
+                    step("activate-config-rollback", "activate-config-rollback", "")
+                    step("activate-licence", "activate-licence", "")
                 }
-                nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor())
             }
+            nodeType(BluePrintTypes.nodeTypeComponentScriptExecutor())
+        }
+    }
 
+    @Test
+    fun testImperativeExecutionService() {
+        runBlocking {
+            val serviceTemplate = mockServiceTemplate()
             val bluePrintContext = BluePrintContext(serviceTemplate)
             bluePrintContext.rootPath = normalizedPathName(".")
             bluePrintContext.entryDefinition = "cba.imperative.test.ImperativeTestDefinitions.kt"
index 188e840..d3495c4 100644 (file)
@@ -7,11 +7,11 @@
   "actionIdentifiers": {
     "blueprintName": "imperative-test",
     "blueprintVersion": "1.0.0",
-    "actionName": "test-wf",
+    "actionName": "imperative-test-wf",
     "mode": "sync"
   },
   "payload": {
-    "test-wf-request": {
+    "imperative-test-wf-request": {
       "hostname": "localhost"
     }
   }
index 06d3421..259efbf 100644 (file)
@@ -26,7 +26,7 @@ class ServiceTemplateBuilder(private val name: String,
                              private val author: String,
                              private val tags: String) {
     private var serviceTemplate = ServiceTemplate()
-    private lateinit var topologyTemplate: TopologyTemplate
+    private var topologyTemplate: TopologyTemplate? = null
     private var metadata: MutableMap<String, String> = hashMapOf()
     private var dslDefinitions: MutableMap<String, JsonNode>? = null
     private var imports: MutableList<ImportDefinition> = mutableListOf()
index 91c2bcb..9051502 100644 (file)
@@ -36,24 +36,20 @@ interface BluePrintWorkFlowService<In, Out> {
 
     suspend fun initializeWorkflow(input: In): EdgeLabel
 
-    suspend fun prepareWorkflowOutput(): Out
+    suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
 
     /** Prepare the message for the Node */
     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
 
     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
 
-    suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
-                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+    suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
 
-    suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
-                         deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+    suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
 
-    suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
-                           deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+    suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
 
-    suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
-                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+    suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
 
 }
 
@@ -71,17 +67,13 @@ sealed class NodeMessage<In, Out>
 
 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
 
-class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
-                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
 
-class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
-                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
 
-class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
-                               val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
 
-class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
-                                 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
 
 enum class EdgeAction(val id: String) {
     EXECUTE("execute"),
@@ -105,18 +97,14 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
     fun cancel() {
         log.info("Received workflow($workflowId) cancel request")
         job.cancel()
-        throw CancellationException("Workflow($workflowId) cancelled as requested ...")
+        throw CancellationException("Workflow($workflowId) cancelled as requested")
     }
 
-    val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
-
-        /** Send message from workflow actor to node actor */
-        fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
-            nodeActor.send(nodeMessage)
-        }
-
+    fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
         /** Process the workflow execution message */
         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+
+            val nodeActor = nodeActor()
             // Prepare Workflow and Populate the Initial store
             initializeWorkflow(workflowExecuteMessage.input)
 
@@ -124,14 +112,18 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             // Prepare first node message and Send NodeExecuteMessage
             // Start node doesn't wait for any nodes, so we can pass Execute message directly
             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
-            sendNodeMessage(nodeExecuteMessage)
-            log.debug("First node triggered successfully, waiting for response")
-
+            /** Send message from workflow actor to node actor */
+            launch {
+                nodeActor.send(nodeExecuteMessage)
+            }
             // Wait for workflow completion or Error
             nodeActor.invokeOnClose { exception ->
                 launch {
-                    log.debug("End Node Completed, processing completion message")
-                    val workflowOutput = prepareWorkflowOutput()
+                    log.info("End Node Completed, processing completion message")
+                    val bluePrintProcessorException: BluePrintProcessorException? =
+                            if (exception != null) BluePrintProcessorException(exception) else null
+
+                    val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
                     workflowExecuteMessage.output.complete(workflowOutput)
                     channel.close(exception)
                 }
@@ -161,7 +153,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
     }
 
 
-    private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+    private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
 
         /** Send message to process from one state to other state */
         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
@@ -228,7 +220,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             }
         }
 
-        fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
+        suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
             val node = message.node
             node.status = NodeStatus.EXECUTING
             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
@@ -237,9 +229,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             } else {
                 log.debug("##### Processing workflow($workflowId) node($node) #####")
                 // Call the Extension function and get the next Edge state.
-                val deferredNodeState = CompletableDeferred<EdgeLabel>()
-                executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
-                deferredNodeState.await()
+                executeNode(node, message.nodeInput, message.nodeOutput)
             }
             // Update Node Completed
             node.status = NodeStatus.EXECUTED
@@ -263,7 +253,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             }
         }
 
-        fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
+        suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
             val node = message.node
             val incomingEdges = graph.incomingEdges(node.id)
             // Check All Incoming Nodes Skipped
@@ -275,9 +265,7 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             if (nonSkippedEdges.isEmpty()) {
                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
                 // Call the Extension Function
-                val deferredNodeState = CompletableDeferred<EdgeLabel>()
-                skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
-                val nodeState = deferredNodeState.await()
+                val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
                 log.info("Skip Node($node) -> Executed State($nodeState)")
                 // Mark the Current node as Skipped
                 node.status = NodeStatus.SKIPPED
@@ -303,21 +291,37 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
             when (nodeMessage) {
                 is NodeReadyMessage<In, Out> -> {
                     // Blocking call
-                    readyNodeWorker(nodeMessage)
+                    try {
+                        readyNodeWorker(nodeMessage)
+                    } catch (e: Exception) {
+                        channel.close(e)
+                    }
                 }
                 is NodeExecuteMessage<In, Out> -> {
                     launch {
-                        executeNodeWorker(nodeMessage)
+                        try {
+                            executeNodeWorker(nodeMessage)
+                        } catch (e: Exception) {
+                            channel.close(e)
+                        }
                     }
                 }
                 is NodeSkipMessage<In, Out> -> {
                     launch {
-                        skipNodeWorker(nodeMessage)
+                        try {
+                            skipNodeWorker(nodeMessage)
+                        } catch (e: Exception) {
+                            channel.close(e)
+                        }
                     }
                 }
                 is NodeRestartMessage<In, Out> -> {
                     launch {
-                        restartNodeWorker(nodeMessage)
+                        try {
+                            restartNodeWorker(nodeMessage)
+                        } catch (e: Exception) {
+                            channel.close(e)
+                        }
                     }
                 }
             }
@@ -330,6 +334,6 @@ abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BlueP
         this.graph = graph
         this.workflowId = bluePrintRuntimeService.id()
         val startMessage = WorkflowExecuteMessage(input, output)
-        workflowActor.send(startMessage)
+        workflowActor().send(startMessage)
     }
 }
\ No newline at end of file
index 62cb108..b8d8cea 100644 (file)
@@ -22,6 +22,7 @@ import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.runBlocking
 import org.junit.Test
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
 import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
@@ -134,49 +135,41 @@ class TestBluePrintWorkFlowService
 
     override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
             : NodeExecuteMessage<String, String> {
-        val deferredNodeOutput = CompletableDeferred<String>()
-        val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput)
-        return nodeExecuteMessage
+        return NodeExecuteMessage(node, "$node Input", "")
     }
 
     override suspend fun executeNode(node: Graph.Node, nodeInput: String,
-                                     deferredNodeOutput: CompletableDeferred<String>,
-                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                     nodeOutput: String): EdgeLabel {
 //        val random = (1..10).random() * 1000
 //        println("will reply in $random ms")
 //        kotlinx.coroutines.delay(random.toLong())
         val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
-        deferredNodeStatus.complete(status)
-        deferredNodeOutput.complete("$node, Output: $nodeInput output")
+        return status
     }
 
-    override suspend fun prepareNodeSkipMessage(node: Graph.Node)
-            : NodeSkipMessage<String, String> {
-        val deferredNodeOutput = CompletableDeferred<String>()
-        val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput)
+    override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> {
+        val nodeOutput = ""
+        val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", nodeOutput)
         return nodeSkipMessage
     }
 
     override suspend fun skipNode(node: Graph.Node, nodeInput: String,
-                                  deferredNodeOutput: CompletableDeferred<String>,
-                                  deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                  nodeOutput: String): EdgeLabel {
         val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
-        deferredNodeStatus.complete(status)
+        return status
     }
 
     override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
-                                    deferredNodeOutput: CompletableDeferred<String>,
-                                    deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                    nodeOutput: String): EdgeLabel {
         TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
     }
 
     override suspend fun restartNode(node: Graph.Node, nodeInput: String,
-                                     deferredNodeOutput: CompletableDeferred<String>,
-                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+                                     nodeOutput: String): EdgeLabel {
         TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
     }
 
-    override suspend fun prepareWorkflowOutput(): String {
+    override suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): String {
         return "Final Response"
     }
 }
\ No newline at end of file