2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
30 interface BluePrintWorkFlowService<In, Out> {
32 /** Executes imperative workflow for the bluePrintRuntimeService [bluePrintRuntimeService] and workflow
33 * input [input], response will be retrieve from output [output]*/
34 suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
35 input: In, output: CompletableDeferred<Out>)
37 suspend fun initializeWorkflow(input: In): EdgeLabel
39 suspend fun prepareWorkflowOutput(): Out
41 /** Prepare the message for the Node */
42 suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
44 suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
46 suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
47 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
49 suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
50 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
52 suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
53 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
55 suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
56 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
60 /** Workflow Message Types */
61 sealed class WorkflowMessage<In, Out>
63 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
65 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
67 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
69 /** Node Message Types */
70 sealed class NodeMessage<In, Out>
72 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
74 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
75 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
77 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
78 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
80 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
81 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
83 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
84 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
86 enum class EdgeAction(val id: String) {
91 /** Abstract workflow service implementation */
92 abstract class AbstractBluePrintWorkFlowService<In, Out>(private val graph: Graph)
93 : CoroutineScope, BluePrintWorkFlowService<In, Out> {
95 private val log = logger(AbstractBluePrintWorkFlowService::class)
97 private val job = Job()
99 lateinit var workflowId: String
101 final override val coroutineContext: CoroutineContext
102 get() = job + CoroutineName("Wf")
104 val root = graph.startNodes()
107 log.info("Received workflow($workflowId) cancel request")
109 throw CancellationException("Workflow($workflowId) cancelled as requested ...")
112 val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
114 /** Send message from workflow actor to node actor */
115 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
116 nodeActor.send(nodeMessage)
119 /** Process the workflow execution message */
120 suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
121 // Prepare Workflow and Populate the Initial store
122 initializeWorkflow(workflowExecuteMessage.input)
124 val startNode = root.first()
125 // Prepare first node message and Send NodeExecuteMessage
126 // Start node doesn't wait for any nodes, so we can pass Execute message directly
127 val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
128 sendNodeMessage(nodeExecuteMessage)
129 log.debug("First node triggered successfully, waiting for response")
131 // Wait for workflow completion or Error
132 nodeActor.invokeOnClose { exception ->
134 log.debug("End Node Completed, processing completion message")
135 val workflowOutput = prepareWorkflowOutput()
136 workflowExecuteMessage.output.complete(workflowOutput)
137 channel.close(exception)
142 /** Process each actor message received based on type */
143 consumeEach { message ->
145 is WorkflowExecuteMessage<In, Out> -> {
147 executeMessageActor(message)
150 is WorkflowRestartMessage<In, Out> -> {
155 is WorkflowCancelMessage<In, Out> -> {
165 private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
167 /** Send message to process from one state to other state */
168 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
169 channel.send(nodeMessage)
172 /** Process the cascade node processing, based on the previous state of the node */
173 fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
174 // Process only Next Success Node
175 val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
176 log.debug("Next Edges :$stateEdges")
177 if (stateEdges.isNotEmpty()) {
178 stateEdges.forEach { stateEdge ->
179 // Prepare next node ready message and Send NodeReadyMessage
180 val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
181 sendNodeMessage(nodeReadyMessage)
186 suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
187 val edge = message.fromEdge
188 val node = edge.target
189 // Check if current edge action is Skip or Execute
190 when (message.edgeAction) {
192 val skipMessage = prepareNodeSkipMessage(node)
193 sendNodeMessage(skipMessage)
195 EdgeAction.EXECUTE -> {
196 val nodeExecuteMessage = prepareNodeExecutionMessage(node)
197 sendNodeMessage(nodeExecuteMessage)
202 suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
203 val edge = message.fromEdge
204 val node = edge.target
205 log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
206 // Update the current incoming edge status to executed or skipped
207 when (message.edgeAction) {
208 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
209 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
211 val incomingEdges = graph.incomingEdges(node.id)
212 if (incomingEdges.size > 1) {
213 // Check all incoming edges executed or skipped
214 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
215 if (notCompletedEdges.isEmpty()) {
216 // Possibility of skip edge action performed at last, but other edges have execute action.
217 val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
218 val newMessage = if (executePresent.isNotEmpty()) {
219 NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
223 triggerToExecuteOrSkip(newMessage)
225 log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
228 triggerToExecuteOrSkip(message)
232 fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
233 val node = message.node
234 node.status = NodeStatus.EXECUTING
235 val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
236 || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
239 log.debug("##### Processing workflow($workflowId) node($node) #####")
240 // Call the Extension function and get the next Edge state.
241 val deferredNodeState = CompletableDeferred<EdgeLabel>()
242 executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
243 deferredNodeState.await()
245 // Update Node Completed
246 node.status = NodeStatus.EXECUTED
247 log.info("Execute Node($node) -> Executed State($nodeState)")
249 // If End Node, Send End Message
250 if (graph.isEndNode(node)) {
251 // Close the current channel
254 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
255 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
256 // Process Skip Edges
257 skippingEdges.forEach { skippingEdge ->
258 // Prepare next node ready message and Send NodeReadyMessage
259 val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
260 sendNodeMessage(nodeReadyMessage)
262 // Process Success Node
263 processNextNodes(node, nodeState)
267 fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
268 val node = message.node
269 val incomingEdges = graph.incomingEdges(node.id)
270 // Check All Incoming Nodes Skipped
271 val nonSkippedEdges = incomingEdges.filter {
272 it.status == EdgeStatus.NOT_STARTED
274 log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
276 if (nonSkippedEdges.isEmpty()) {
277 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
278 // Call the Extension Function
279 val deferredNodeState = CompletableDeferred<EdgeLabel>()
280 skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
281 val nodeState = deferredNodeState.await()
282 log.info("Skip Node($node) -> Executed State($nodeState)")
283 // Mark the Current node as Skipped
284 node.status = NodeStatus.SKIPPED
285 // Look for next possible skip nodes
286 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
287 val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
288 sendNodeMessage(nodeReadyMessage)
293 fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
297 fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
299 throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
302 /** Process each actor message received based on type **/
303 consumeEach { nodeMessage ->
305 is NodeReadyMessage<In, Out> -> {
307 readyNodeWorker(nodeMessage)
309 is NodeExecuteMessage<In, Out> -> {
311 executeNodeWorker(nodeMessage)
314 is NodeSkipMessage<In, Out> -> {
316 skipNodeWorker(nodeMessage)
319 is NodeRestartMessage<In, Out> -> {
321 restartNodeWorker(nodeMessage)
329 override suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In, output: CompletableDeferred<Out>) {
330 log.info("Executing Graph : $graph")
331 this.workflowId = bluePrintRuntimeService.id()
333 val startMessage = WorkflowExecuteMessage(input, output)
334 workflowActor.send(startMessage)
337 open fun validateWorkflow() {
338 //check(!graph.findCycles().isNotEmpty()) { "Graph is cyclic, Cycle is not supported" }