Merge "Add imperative workflow service."
authorDan Timoney <dtimoney@att.com>
Fri, 16 Aug 2019 16:14:48 +0000 (16:14 +0000)
committerGerrit Code Review <gerrit@onap.org>
Fri, 16 Aug 2019 16:14:48 +0000 (16:14 +0000)
ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt [new file with mode: 0644]
ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt [new file with mode: 0644]

diff --git a/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt b/ms/controllerblueprints/modules/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowService.kt
new file mode 100644 (file)
index 0000000..019f318
--- /dev/null
@@ -0,0 +1,340 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.service
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.actor
+import kotlinx.coroutines.channels.consumeEach
+import org.onap.ccsdk.cds.controllerblueprints.core.*
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
+import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
+import kotlin.coroutines.CoroutineContext
+
+interface BluePrintWorkFlowService<In, Out> {
+
+    /** Executes imperative workflow for the bluePrintRuntimeService [bluePrintRuntimeService] and workflow
+     * input [input], response will be retrieve from output [output]*/
+    suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
+                                input: In, output: CompletableDeferred<Out>)
+
+    suspend fun initializeWorkflow(input: In): EdgeLabel
+
+    suspend fun prepareWorkflowOutput(): Out
+
+    /** Prepare the message for the Node */
+    suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
+
+    suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
+
+    suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+    suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+                         deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+    suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+                           deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+    suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
+                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
+
+}
+
+/** Workflow Message Types */
+sealed class WorkflowMessage<In, Out>
+
+class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
+
+/** Node Message Types */
+sealed class NodeMessage<In, Out>
+
+class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
+
+class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+                               val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
+                                 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
+
+enum class EdgeAction(val id: String) {
+    EXECUTE("execute"),
+    SKIP("skip")
+}
+
+/** Abstract workflow service implementation */
+abstract class AbstractBluePrintWorkFlowService<In, Out>(private val graph: Graph)
+    : CoroutineScope, BluePrintWorkFlowService<In, Out> {
+
+    private val log = logger(AbstractBluePrintWorkFlowService::class)
+
+    private val job = Job()
+
+    lateinit var workflowId: String
+
+    final override val coroutineContext: CoroutineContext
+        get() = job + CoroutineName("Wf")
+
+    val root = graph.startNodes()
+
+    fun cancel() {
+        log.info("Received workflow($workflowId) cancel request")
+        job.cancel()
+        throw CancellationException("Workflow($workflowId) cancelled as requested ...")
+    }
+
+    val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+
+        /** Send message from workflow actor to node actor */
+        fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
+            nodeActor.send(nodeMessage)
+        }
+
+        /** Process the workflow execution message */
+        suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
+            // Prepare Workflow and Populate the Initial store
+            initializeWorkflow(workflowExecuteMessage.input)
+
+            val startNode = root.first()
+            // Prepare first node message and Send NodeExecuteMessage
+            // Start node doesn't wait for any nodes, so we can pass Execute message directly
+            val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
+            sendNodeMessage(nodeExecuteMessage)
+            log.debug("First node triggered successfully, waiting for response")
+
+            // Wait for workflow completion or Error
+            nodeActor.invokeOnClose { exception ->
+                launch {
+                    log.debug("End Node Completed, processing completion message")
+                    val workflowOutput = prepareWorkflowOutput()
+                    workflowExecuteMessage.output.complete(workflowOutput)
+                    channel.close(exception)
+                }
+            }
+        }
+
+        /** Process each actor message received based on type */
+        consumeEach { message ->
+            when (message) {
+                is WorkflowExecuteMessage<In, Out> -> {
+                    launch {
+                        executeMessageActor(message)
+                    }
+                }
+                is WorkflowRestartMessage<In, Out> -> {
+                    launch {
+                        TODO("")
+                    }
+                }
+                is WorkflowCancelMessage<In, Out> -> {
+                    launch {
+                        TODO("")
+                    }
+                }
+            }
+        }
+    }
+
+
+    private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
+
+        /** Send message to process from one state to other state */
+        fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
+            channel.send(nodeMessage)
+        }
+
+        /** Process the cascade node processing, based on the previous state of the node */
+        fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
+            // Process only Next Success Node
+            val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
+            log.debug("Next Edges :$stateEdges")
+            if (stateEdges.isNotEmpty()) {
+                stateEdges.forEach { stateEdge ->
+                    // Prepare next node ready message and Send NodeReadyMessage
+                    val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
+                    sendNodeMessage(nodeReadyMessage)
+                }
+            }
+        }
+
+        suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
+            val edge = message.fromEdge
+            val node = edge.target
+            // Check if current edge action is Skip or Execute
+            when (message.edgeAction) {
+                EdgeAction.SKIP -> {
+                    val skipMessage = prepareNodeSkipMessage(node)
+                    sendNodeMessage(skipMessage)
+                }
+                EdgeAction.EXECUTE -> {
+                    val nodeExecuteMessage = prepareNodeExecutionMessage(node)
+                    sendNodeMessage(nodeExecuteMessage)
+                }
+            }
+        }
+
+        suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
+            val edge = message.fromEdge
+            val node = edge.target
+            log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
+            // Update the current incoming edge status to executed or skipped
+            when (message.edgeAction) {
+                EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
+                EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
+            }
+            val incomingEdges = graph.incomingEdges(node.id)
+            if (incomingEdges.size > 1) {
+                // Check all incoming edges executed or skipped
+                val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
+                if (notCompletedEdges.isEmpty()) {
+                    // Possibility of skip edge action performed at last, but other edges have execute action.
+                    val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
+                    val newMessage = if (executePresent.isNotEmpty()) {
+                        NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
+                    } else {
+                        message
+                    }
+                    triggerToExecuteOrSkip(newMessage)
+                } else {
+                    log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
+                }
+            } else {
+                triggerToExecuteOrSkip(message)
+            }
+        }
+
+        fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
+            val node = message.node
+            node.status = NodeStatus.EXECUTING
+            val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
+                    || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
+                EdgeLabel.SUCCESS
+            } else {
+                log.debug("##### Processing workflow($workflowId) node($node) #####")
+                // Call the Extension function and get the next Edge state.
+                val deferredNodeState = CompletableDeferred<EdgeLabel>()
+                executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
+                deferredNodeState.await()
+            }
+            // Update Node Completed
+            node.status = NodeStatus.EXECUTED
+            log.info("Execute Node($node) -> Executed State($nodeState)")
+
+            // If End Node, Send End Message
+            if (graph.isEndNode(node)) {
+                // Close the current channel
+                channel.close()
+            } else {
+                val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
+                log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
+                // Process Skip Edges
+                skippingEdges.forEach { skippingEdge ->
+                    // Prepare next node ready message and Send NodeReadyMessage
+                    val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
+                    sendNodeMessage(nodeReadyMessage)
+                }
+                // Process Success Node
+                processNextNodes(node, nodeState)
+            }
+        }
+
+        fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
+            val node = message.node
+            val incomingEdges = graph.incomingEdges(node.id)
+            // Check All Incoming Nodes Skipped
+            val nonSkippedEdges = incomingEdges.filter {
+                it.status == EdgeStatus.NOT_STARTED
+            }
+            log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
+
+            if (nonSkippedEdges.isEmpty()) {
+                log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
+                // Call the Extension Function
+                val deferredNodeState = CompletableDeferred<EdgeLabel>()
+                skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
+                val nodeState = deferredNodeState.await()
+                log.info("Skip Node($node) -> Executed State($nodeState)")
+                // Mark the Current node as Skipped
+                node.status = NodeStatus.SKIPPED
+                // Look for next possible skip nodes
+                graph.outgoingEdges(node.id).forEach { outgoingEdge ->
+                    val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
+                    sendNodeMessage(nodeReadyMessage)
+                }
+            }
+        }
+
+        fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
+            TODO()
+        }
+
+        fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
+            channel.close()
+            throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
+        }
+
+        /** Process each actor message received based on type **/
+        consumeEach { nodeMessage ->
+            when (nodeMessage) {
+                is NodeReadyMessage<In, Out> -> {
+                    // Blocking call
+                    readyNodeWorker(nodeMessage)
+                }
+                is NodeExecuteMessage<In, Out> -> {
+                    launch {
+                        executeNodeWorker(nodeMessage)
+                    }
+                }
+                is NodeSkipMessage<In, Out> -> {
+                    launch {
+                        skipNodeWorker(nodeMessage)
+                    }
+                }
+                is NodeRestartMessage<In, Out> -> {
+                    launch {
+                        restartNodeWorker(nodeMessage)
+                    }
+                }
+            }
+        }
+    }
+
+
+    override suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In, output: CompletableDeferred<Out>) {
+        log.info("Executing Graph : $graph")
+        this.workflowId = bluePrintRuntimeService.id()
+        validateWorkflow()
+        val startMessage = WorkflowExecuteMessage(input, output)
+        workflowActor.send(startMessage)
+    }
+
+    open fun validateWorkflow() {
+        //check(!graph.findCycles().isNotEmpty()) { "Graph is cyclic, Cycle is not supported" }
+    }
+}
\ No newline at end of file
diff --git a/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt b/ms/controllerblueprints/modules/blueprint-core/src/test/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintWorkflowServiceTest.kt
new file mode 100644 (file)
index 0000000..7cb6492
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+ *  Copyright © 2019 IBM.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.controllerblueprints.core.service
+
+import io.mockk.every
+import io.mockk.mockk
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
+import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
+import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
+import kotlin.test.assertNotNull
+
+class BluePrintWorkflowServiceTest {
+    @Test
+    fun testSimpleFlow() {
+        runBlocking {
+            val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>E/SUCCESS, E>END/SUCCESS]"
+                    .toGraph()
+            val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+            simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+            val deferredOutput = CompletableDeferred<String>()
+            val input = "123456"
+            simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+            val response = deferredOutput.await()
+            assertNotNull(response, "failed to get response")
+        }
+    }
+
+    @Test
+    fun testConditionalFlow() {
+        runBlocking {
+            val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+                    .toGraph()
+            val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+            simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+            val deferredOutput = CompletableDeferred<String>()
+            val input = "123456"
+            simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+            val response = deferredOutput.await()
+            assertNotNull(response, "failed to get response")
+        }
+    }
+
+    @Test
+    fun testBothConditionalFlow() {
+        runBlocking {
+            // Failure Flow
+            val failurePatGraph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+                    .toGraph()
+            val failurePathWorkflow = TestBluePrintWorkFlowService(failurePatGraph)
+            failurePathWorkflow.simulatedState = prepareSimulation(arrayListOf("B", "C", "D", "E"),
+                    arrayListOf("A"))
+            val failurePathWorkflowDeferredOutput = CompletableDeferred<String>()
+            val failurePathWorkflowInput = "123456"
+            failurePathWorkflow.executeWorkflow(mockBluePrintRuntimeService(), failurePathWorkflowInput, failurePathWorkflowDeferredOutput)
+            val failurePathResponse = failurePathWorkflowDeferredOutput.await()
+            assertNotNull(failurePathResponse, "failed to get response")
+        }
+    }
+
+    @Test
+    fun testMultipleSkipFlow() {
+        runBlocking {
+            val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, C>D/SUCCESS, D>E/SUCCESS, B>E/SUCCESS, E>END/SUCCESS]"
+                    .toGraph()
+            val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+            simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
+            val deferredOutput = CompletableDeferred<String>()
+            val input = "123456"
+            simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+            val response = deferredOutput.await()
+            assertNotNull(response, "failed to get response")
+        }
+    }
+
+    @Test
+    fun testParallelFlow() {
+        runBlocking {
+            val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/SUCCESS, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
+                    .toGraph()
+            val simpleWorkflow = TestBluePrintWorkFlowService(graph)
+            simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
+            val deferredOutput = CompletableDeferred<String>()
+            val input = "123456"
+            simpleWorkflow.executeWorkflow(mockBluePrintRuntimeService(), input, deferredOutput)
+            val response = deferredOutput.await()
+            assertNotNull(response, "failed to get response")
+        }
+    }
+
+    private fun mockBluePrintRuntimeService(): BluePrintRuntimeService<*> {
+        val bluePrintRuntimeService = mockk<BluePrintRuntimeService<*>>()
+        every { bluePrintRuntimeService.id() } returns "123456"
+        return bluePrintRuntimeService
+    }
+
+    private fun prepareSimulation(successes: List<String>?, failures: List<String>?): MutableMap<String, EdgeLabel> {
+        val simulatedState: MutableMap<String, EdgeLabel> = hashMapOf()
+        successes?.forEach {
+            simulatedState[it] = EdgeLabel.SUCCESS
+        }
+        failures?.forEach {
+            simulatedState[it] = EdgeLabel.FAILURE
+        }
+        return simulatedState
+    }
+}
+
+class TestBluePrintWorkFlowService(graph: Graph)
+    : AbstractBluePrintWorkFlowService<String, String>(graph) {
+
+    lateinit var simulatedState: MutableMap<String, EdgeLabel>
+
+    override suspend fun initializeWorkflow(input: String): EdgeLabel {
+        return EdgeLabel.SUCCESS
+    }
+
+    override suspend fun prepareNodeExecutionMessage(node: Graph.Node)
+            : NodeExecuteMessage<String, String> {
+        val deferredNodeOutput = CompletableDeferred<String>()
+        val nodeExecuteMessage = NodeExecuteMessage(node, "$node Input", deferredNodeOutput)
+        return nodeExecuteMessage
+    }
+
+    override suspend fun executeNode(node: Graph.Node, nodeInput: String,
+                                     deferredNodeOutput: CompletableDeferred<String>,
+                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+//        val random = (1..10).random() * 1000
+//        println("will reply in $random ms")
+//        kotlinx.coroutines.delay(random.toLong())
+        val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
+        deferredNodeStatus.complete(status)
+        deferredNodeOutput.complete("$node, Output: $nodeInput output")
+    }
+
+    override suspend fun prepareNodeSkipMessage(node: Graph.Node)
+            : NodeSkipMessage<String, String> {
+        val deferredNodeOutput = CompletableDeferred<String>()
+        val nodeSkipMessage = NodeSkipMessage(node, "$node Skip Input", deferredNodeOutput)
+        return nodeSkipMessage
+    }
+
+    override suspend fun skipNode(node: Graph.Node, nodeInput: String,
+                                  deferredNodeOutput: CompletableDeferred<String>,
+                                  deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+        val status = simulatedState[node.id] ?: throw BluePrintException("failed to get status for the node($node)")
+        deferredNodeStatus.complete(status)
+    }
+
+    override suspend fun cancelNode(node: Graph.Node, nodeInput: String,
+                                    deferredNodeOutput: CompletableDeferred<String>,
+                                    deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override suspend fun restartNode(node: Graph.Node, nodeInput: String,
+                                     deferredNodeOutput: CompletableDeferred<String>,
+                                     deferredNodeStatus: CompletableDeferred<EdgeLabel>) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override suspend fun prepareWorkflowOutput(): String {
+        return "Final Response"
+    }
+}
\ No newline at end of file