2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
30 interface BluePrintWorkFlowService<In, Out> {
32 /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33 * and workflow input [input], response will be retrieve from output [output]*/
34 suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
35 input: In, output: CompletableDeferred<Out>)
37 suspend fun initializeWorkflow(input: In): EdgeLabel
39 suspend fun prepareWorkflowOutput(): Out
41 /** Prepare the message for the Node */
42 suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
44 suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
46 suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
47 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
49 suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
50 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
52 suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
53 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
55 suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
56 deferredNodeStatus: CompletableDeferred<EdgeLabel>)
60 /** Workflow Message Types */
61 sealed class WorkflowMessage<In, Out>
63 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
65 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
67 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
69 /** Node Message Types */
70 sealed class NodeMessage<In, Out>
72 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
74 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
75 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
77 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
78 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
80 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
81 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
83 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
84 val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
86 enum class EdgeAction(val id: String) {
91 /** Abstract workflow service implementation */
92 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
94 lateinit var graph: Graph
96 private val log = logger(AbstractBluePrintWorkFlowService::class)
98 private val job = Job()
100 lateinit var workflowId: String
102 final override val coroutineContext: CoroutineContext
103 get() = job + CoroutineName("Wf")
106 log.info("Received workflow($workflowId) cancel request")
108 throw CancellationException("Workflow($workflowId) cancelled as requested ...")
111 val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
113 /** Send message from workflow actor to node actor */
114 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
115 nodeActor.send(nodeMessage)
118 /** Process the workflow execution message */
119 suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
120 // Prepare Workflow and Populate the Initial store
121 initializeWorkflow(workflowExecuteMessage.input)
123 val startNode = graph.startNodes().first()
124 // Prepare first node message and Send NodeExecuteMessage
125 // Start node doesn't wait for any nodes, so we can pass Execute message directly
126 val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
127 sendNodeMessage(nodeExecuteMessage)
128 log.debug("First node triggered successfully, waiting for response")
130 // Wait for workflow completion or Error
131 nodeActor.invokeOnClose { exception ->
133 log.debug("End Node Completed, processing completion message")
134 val workflowOutput = prepareWorkflowOutput()
135 workflowExecuteMessage.output.complete(workflowOutput)
136 channel.close(exception)
141 /** Process each actor message received based on type */
142 consumeEach { message ->
144 is WorkflowExecuteMessage<In, Out> -> {
146 executeMessageActor(message)
149 is WorkflowRestartMessage<In, Out> -> {
154 is WorkflowCancelMessage<In, Out> -> {
164 private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
166 /** Send message to process from one state to other state */
167 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
168 channel.send(nodeMessage)
171 /** Process the cascade node processing, based on the previous state of the node */
172 fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
173 // Process only Next Success Node
174 val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
175 log.debug("Next Edges :$stateEdges")
176 if (stateEdges.isNotEmpty()) {
177 stateEdges.forEach { stateEdge ->
178 // Prepare next node ready message and Send NodeReadyMessage
179 val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
180 sendNodeMessage(nodeReadyMessage)
185 suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
186 val edge = message.fromEdge
187 val node = edge.target
188 // Check if current edge action is Skip or Execute
189 when (message.edgeAction) {
191 val skipMessage = prepareNodeSkipMessage(node)
192 sendNodeMessage(skipMessage)
194 EdgeAction.EXECUTE -> {
195 val nodeExecuteMessage = prepareNodeExecutionMessage(node)
196 sendNodeMessage(nodeExecuteMessage)
201 suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
202 val edge = message.fromEdge
203 val node = edge.target
204 log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
205 // Update the current incoming edge status to executed or skipped
206 when (message.edgeAction) {
207 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
208 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
210 val incomingEdges = graph.incomingEdges(node.id)
211 if (incomingEdges.size > 1) {
212 // Check all incoming edges executed or skipped
213 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
214 if (notCompletedEdges.isEmpty()) {
215 // Possibility of skip edge action performed at last, but other edges have execute action.
216 val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
217 val newMessage = if (executePresent.isNotEmpty()) {
218 NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
222 triggerToExecuteOrSkip(newMessage)
224 log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
227 triggerToExecuteOrSkip(message)
231 fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
232 val node = message.node
233 node.status = NodeStatus.EXECUTING
234 val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
235 || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
238 log.debug("##### Processing workflow($workflowId) node($node) #####")
239 // Call the Extension function and get the next Edge state.
240 val deferredNodeState = CompletableDeferred<EdgeLabel>()
241 executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
242 deferredNodeState.await()
244 // Update Node Completed
245 node.status = NodeStatus.EXECUTED
246 log.info("Execute Node($node) -> Executed State($nodeState)")
248 // If End Node, Send End Message
249 if (graph.isEndNode(node)) {
250 // Close the current channel
253 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
254 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
255 // Process Skip Edges
256 skippingEdges.forEach { skippingEdge ->
257 // Prepare next node ready message and Send NodeReadyMessage
258 val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
259 sendNodeMessage(nodeReadyMessage)
261 // Process Success Node
262 processNextNodes(node, nodeState)
266 fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
267 val node = message.node
268 val incomingEdges = graph.incomingEdges(node.id)
269 // Check All Incoming Nodes Skipped
270 val nonSkippedEdges = incomingEdges.filter {
271 it.status == EdgeStatus.NOT_STARTED
273 log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
275 if (nonSkippedEdges.isEmpty()) {
276 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
277 // Call the Extension Function
278 val deferredNodeState = CompletableDeferred<EdgeLabel>()
279 skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
280 val nodeState = deferredNodeState.await()
281 log.info("Skip Node($node) -> Executed State($nodeState)")
282 // Mark the Current node as Skipped
283 node.status = NodeStatus.SKIPPED
284 // Look for next possible skip nodes
285 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
286 val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
287 sendNodeMessage(nodeReadyMessage)
292 fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
296 fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
298 throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
301 /** Process each actor message received based on type **/
302 consumeEach { nodeMessage ->
304 is NodeReadyMessage<In, Out> -> {
306 readyNodeWorker(nodeMessage)
308 is NodeExecuteMessage<In, Out> -> {
310 executeNodeWorker(nodeMessage)
313 is NodeSkipMessage<In, Out> -> {
315 skipNodeWorker(nodeMessage)
318 is NodeRestartMessage<In, Out> -> {
320 restartNodeWorker(nodeMessage)
327 override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
328 input: In, output: CompletableDeferred<Out>) {
329 log.info("Executing Graph : $graph")
331 this.workflowId = bluePrintRuntimeService.id()
332 val startMessage = WorkflowExecuteMessage(input, output)
333 workflowActor.send(startMessage)