019f3180556758c9878f8fce63afeaf816d73d29
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
29
30 interface BluePrintWorkFlowService<In, Out> {
31
32     /** Executes imperative workflow for the bluePrintRuntimeService [bluePrintRuntimeService] and workflow
33      * input [input], response will be retrieve from output [output]*/
34     suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>,
35                                 input: In, output: CompletableDeferred<Out>)
36
37     suspend fun initializeWorkflow(input: In): EdgeLabel
38
39     suspend fun prepareWorkflowOutput(): Out
40
41     /** Prepare the message for the Node */
42     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
43
44     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
45
46     suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
47                             deferredNodeStatus: CompletableDeferred<EdgeLabel>)
48
49     suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
50                          deferredNodeStatus: CompletableDeferred<EdgeLabel>)
51
52     suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
53                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
54
55     suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
56                             deferredNodeStatus: CompletableDeferred<EdgeLabel>)
57
58 }
59
60 /** Workflow Message Types */
61 sealed class WorkflowMessage<In, Out>
62
63 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
64
65 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
66
67 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
68
69 /** Node Message Types */
70 sealed class NodeMessage<In, Out>
71
72 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
73
74 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
75                                   val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
76
77 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
78                                   val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
79
80 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
81                                val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
82
83 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
84                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
85
86 enum class EdgeAction(val id: String) {
87     EXECUTE("execute"),
88     SKIP("skip")
89 }
90
91 /** Abstract workflow service implementation */
92 abstract class AbstractBluePrintWorkFlowService<In, Out>(private val graph: Graph)
93     : CoroutineScope, BluePrintWorkFlowService<In, Out> {
94
95     private val log = logger(AbstractBluePrintWorkFlowService::class)
96
97     private val job = Job()
98
99     lateinit var workflowId: String
100
101     final override val coroutineContext: CoroutineContext
102         get() = job + CoroutineName("Wf")
103
104     val root = graph.startNodes()
105
106     fun cancel() {
107         log.info("Received workflow($workflowId) cancel request")
108         job.cancel()
109         throw CancellationException("Workflow($workflowId) cancelled as requested ...")
110     }
111
112     val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
113
114         /** Send message from workflow actor to node actor */
115         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
116             nodeActor.send(nodeMessage)
117         }
118
119         /** Process the workflow execution message */
120         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
121             // Prepare Workflow and Populate the Initial store
122             initializeWorkflow(workflowExecuteMessage.input)
123
124             val startNode = root.first()
125             // Prepare first node message and Send NodeExecuteMessage
126             // Start node doesn't wait for any nodes, so we can pass Execute message directly
127             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
128             sendNodeMessage(nodeExecuteMessage)
129             log.debug("First node triggered successfully, waiting for response")
130
131             // Wait for workflow completion or Error
132             nodeActor.invokeOnClose { exception ->
133                 launch {
134                     log.debug("End Node Completed, processing completion message")
135                     val workflowOutput = prepareWorkflowOutput()
136                     workflowExecuteMessage.output.complete(workflowOutput)
137                     channel.close(exception)
138                 }
139             }
140         }
141
142         /** Process each actor message received based on type */
143         consumeEach { message ->
144             when (message) {
145                 is WorkflowExecuteMessage<In, Out> -> {
146                     launch {
147                         executeMessageActor(message)
148                     }
149                 }
150                 is WorkflowRestartMessage<In, Out> -> {
151                     launch {
152                         TODO("")
153                     }
154                 }
155                 is WorkflowCancelMessage<In, Out> -> {
156                     launch {
157                         TODO("")
158                     }
159                 }
160             }
161         }
162     }
163
164
165     private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
166
167         /** Send message to process from one state to other state */
168         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
169             channel.send(nodeMessage)
170         }
171
172         /** Process the cascade node processing, based on the previous state of the node */
173         fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
174             // Process only Next Success Node
175             val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
176             log.debug("Next Edges :$stateEdges")
177             if (stateEdges.isNotEmpty()) {
178                 stateEdges.forEach { stateEdge ->
179                     // Prepare next node ready message and Send NodeReadyMessage
180                     val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
181                     sendNodeMessage(nodeReadyMessage)
182                 }
183             }
184         }
185
186         suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
187             val edge = message.fromEdge
188             val node = edge.target
189             // Check if current edge action is Skip or Execute
190             when (message.edgeAction) {
191                 EdgeAction.SKIP -> {
192                     val skipMessage = prepareNodeSkipMessage(node)
193                     sendNodeMessage(skipMessage)
194                 }
195                 EdgeAction.EXECUTE -> {
196                     val nodeExecuteMessage = prepareNodeExecutionMessage(node)
197                     sendNodeMessage(nodeExecuteMessage)
198                 }
199             }
200         }
201
202         suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
203             val edge = message.fromEdge
204             val node = edge.target
205             log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
206             // Update the current incoming edge status to executed or skipped
207             when (message.edgeAction) {
208                 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
209                 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
210             }
211             val incomingEdges = graph.incomingEdges(node.id)
212             if (incomingEdges.size > 1) {
213                 // Check all incoming edges executed or skipped
214                 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
215                 if (notCompletedEdges.isEmpty()) {
216                     // Possibility of skip edge action performed at last, but other edges have execute action.
217                     val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
218                     val newMessage = if (executePresent.isNotEmpty()) {
219                         NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
220                     } else {
221                         message
222                     }
223                     triggerToExecuteOrSkip(newMessage)
224                 } else {
225                     log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
226                 }
227             } else {
228                 triggerToExecuteOrSkip(message)
229             }
230         }
231
232         fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
233             val node = message.node
234             node.status = NodeStatus.EXECUTING
235             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
236                     || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
237                 EdgeLabel.SUCCESS
238             } else {
239                 log.debug("##### Processing workflow($workflowId) node($node) #####")
240                 // Call the Extension function and get the next Edge state.
241                 val deferredNodeState = CompletableDeferred<EdgeLabel>()
242                 executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
243                 deferredNodeState.await()
244             }
245             // Update Node Completed
246             node.status = NodeStatus.EXECUTED
247             log.info("Execute Node($node) -> Executed State($nodeState)")
248
249             // If End Node, Send End Message
250             if (graph.isEndNode(node)) {
251                 // Close the current channel
252                 channel.close()
253             } else {
254                 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
255                 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
256                 // Process Skip Edges
257                 skippingEdges.forEach { skippingEdge ->
258                     // Prepare next node ready message and Send NodeReadyMessage
259                     val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
260                     sendNodeMessage(nodeReadyMessage)
261                 }
262                 // Process Success Node
263                 processNextNodes(node, nodeState)
264             }
265         }
266
267         fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
268             val node = message.node
269             val incomingEdges = graph.incomingEdges(node.id)
270             // Check All Incoming Nodes Skipped
271             val nonSkippedEdges = incomingEdges.filter {
272                 it.status == EdgeStatus.NOT_STARTED
273             }
274             log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
275
276             if (nonSkippedEdges.isEmpty()) {
277                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
278                 // Call the Extension Function
279                 val deferredNodeState = CompletableDeferred<EdgeLabel>()
280                 skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
281                 val nodeState = deferredNodeState.await()
282                 log.info("Skip Node($node) -> Executed State($nodeState)")
283                 // Mark the Current node as Skipped
284                 node.status = NodeStatus.SKIPPED
285                 // Look for next possible skip nodes
286                 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
287                     val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
288                     sendNodeMessage(nodeReadyMessage)
289                 }
290             }
291         }
292
293         fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
294             TODO()
295         }
296
297         fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
298             channel.close()
299             throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
300         }
301
302         /** Process each actor message received based on type **/
303         consumeEach { nodeMessage ->
304             when (nodeMessage) {
305                 is NodeReadyMessage<In, Out> -> {
306                     // Blocking call
307                     readyNodeWorker(nodeMessage)
308                 }
309                 is NodeExecuteMessage<In, Out> -> {
310                     launch {
311                         executeNodeWorker(nodeMessage)
312                     }
313                 }
314                 is NodeSkipMessage<In, Out> -> {
315                     launch {
316                         skipNodeWorker(nodeMessage)
317                     }
318                 }
319                 is NodeRestartMessage<In, Out> -> {
320                     launch {
321                         restartNodeWorker(nodeMessage)
322                     }
323                 }
324             }
325         }
326     }
327
328
329     override suspend fun executeWorkflow(bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In, output: CompletableDeferred<Out>) {
330         log.info("Executing Graph : $graph")
331         this.workflowId = bluePrintRuntimeService.id()
332         validateWorkflow()
333         val startMessage = WorkflowExecuteMessage(input, output)
334         workflowActor.send(startMessage)
335     }
336
337     open fun validateWorkflow() {
338         //check(!graph.findCycles().isNotEmpty()) { "Graph is cyclic, Cycle is not supported" }
339     }
340 }