91c2bcbbbb6dc951adda66a1d5670c704f3692d9
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
29
30 interface BluePrintWorkFlowService<In, Out> {
31
32     /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33      * and workflow input [input], response will be retrieve from output [output]*/
34     suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
35                                 input: In, output: CompletableDeferred<Out>)
36
37     suspend fun initializeWorkflow(input: In): EdgeLabel
38
39     suspend fun prepareWorkflowOutput(): Out
40
41     /** Prepare the message for the Node */
42     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
43
44     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
45
46     suspend fun executeNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
47                             deferredNodeStatus: CompletableDeferred<EdgeLabel>)
48
49     suspend fun skipNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
50                          deferredNodeStatus: CompletableDeferred<EdgeLabel>)
51
52     suspend fun cancelNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
53                            deferredNodeStatus: CompletableDeferred<EdgeLabel>)
54
55     suspend fun restartNode(node: Graph.Node, nodeInput: In, deferredNodeOutput: CompletableDeferred<Out>,
56                             deferredNodeStatus: CompletableDeferred<EdgeLabel>)
57
58 }
59
60 /** Workflow Message Types */
61 sealed class WorkflowMessage<In, Out>
62
63 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
64
65 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
66
67 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
68
69 /** Node Message Types */
70 sealed class NodeMessage<In, Out>
71
72 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
73
74 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
75                                   val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
76
77 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
78                                   val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
79
80 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
81                                val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
82
83 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In,
84                                  val nodeOutput: CompletableDeferred<Out>) : NodeMessage<In, Out>()
85
86 enum class EdgeAction(val id: String) {
87     EXECUTE("execute"),
88     SKIP("skip")
89 }
90
91 /** Abstract workflow service implementation */
92 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
93
94     lateinit var graph: Graph
95
96     private val log = logger(AbstractBluePrintWorkFlowService::class)
97
98     private val job = Job()
99
100     lateinit var workflowId: String
101
102     final override val coroutineContext: CoroutineContext
103         get() = job + CoroutineName("Wf")
104
105     fun cancel() {
106         log.info("Received workflow($workflowId) cancel request")
107         job.cancel()
108         throw CancellationException("Workflow($workflowId) cancelled as requested ...")
109     }
110
111     val workflowActor = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
112
113         /** Send message from workflow actor to node actor */
114         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
115             nodeActor.send(nodeMessage)
116         }
117
118         /** Process the workflow execution message */
119         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
120             // Prepare Workflow and Populate the Initial store
121             initializeWorkflow(workflowExecuteMessage.input)
122
123             val startNode = graph.startNodes().first()
124             // Prepare first node message and Send NodeExecuteMessage
125             // Start node doesn't wait for any nodes, so we can pass Execute message directly
126             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
127             sendNodeMessage(nodeExecuteMessage)
128             log.debug("First node triggered successfully, waiting for response")
129
130             // Wait for workflow completion or Error
131             nodeActor.invokeOnClose { exception ->
132                 launch {
133                     log.debug("End Node Completed, processing completion message")
134                     val workflowOutput = prepareWorkflowOutput()
135                     workflowExecuteMessage.output.complete(workflowOutput)
136                     channel.close(exception)
137                 }
138             }
139         }
140
141         /** Process each actor message received based on type */
142         consumeEach { message ->
143             when (message) {
144                 is WorkflowExecuteMessage<In, Out> -> {
145                     launch {
146                         executeMessageActor(message)
147                     }
148                 }
149                 is WorkflowRestartMessage<In, Out> -> {
150                     launch {
151                         TODO("")
152                     }
153                 }
154                 is WorkflowCancelMessage<In, Out> -> {
155                     launch {
156                         TODO("")
157                     }
158                 }
159             }
160         }
161     }
162
163
164     private val nodeActor = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
165
166         /** Send message to process from one state to other state */
167         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
168             channel.send(nodeMessage)
169         }
170
171         /** Process the cascade node processing, based on the previous state of the node */
172         fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
173             // Process only Next Success Node
174             val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
175             log.debug("Next Edges :$stateEdges")
176             if (stateEdges.isNotEmpty()) {
177                 stateEdges.forEach { stateEdge ->
178                     // Prepare next node ready message and Send NodeReadyMessage
179                     val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
180                     sendNodeMessage(nodeReadyMessage)
181                 }
182             }
183         }
184
185         suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
186             val edge = message.fromEdge
187             val node = edge.target
188             // Check if current edge action is Skip or Execute
189             when (message.edgeAction) {
190                 EdgeAction.SKIP -> {
191                     val skipMessage = prepareNodeSkipMessage(node)
192                     sendNodeMessage(skipMessage)
193                 }
194                 EdgeAction.EXECUTE -> {
195                     val nodeExecuteMessage = prepareNodeExecutionMessage(node)
196                     sendNodeMessage(nodeExecuteMessage)
197                 }
198             }
199         }
200
201         suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
202             val edge = message.fromEdge
203             val node = edge.target
204             log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
205             // Update the current incoming edge status to executed or skipped
206             when (message.edgeAction) {
207                 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
208                 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
209             }
210             val incomingEdges = graph.incomingEdges(node.id)
211             if (incomingEdges.size > 1) {
212                 // Check all incoming edges executed or skipped
213                 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
214                 if (notCompletedEdges.isEmpty()) {
215                     // Possibility of skip edge action performed at last, but other edges have execute action.
216                     val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
217                     val newMessage = if (executePresent.isNotEmpty()) {
218                         NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
219                     } else {
220                         message
221                     }
222                     triggerToExecuteOrSkip(newMessage)
223                 } else {
224                     log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
225                 }
226             } else {
227                 triggerToExecuteOrSkip(message)
228             }
229         }
230
231         fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) = launch {
232             val node = message.node
233             node.status = NodeStatus.EXECUTING
234             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
235                     || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
236                 EdgeLabel.SUCCESS
237             } else {
238                 log.debug("##### Processing workflow($workflowId) node($node) #####")
239                 // Call the Extension function and get the next Edge state.
240                 val deferredNodeState = CompletableDeferred<EdgeLabel>()
241                 executeNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
242                 deferredNodeState.await()
243             }
244             // Update Node Completed
245             node.status = NodeStatus.EXECUTED
246             log.info("Execute Node($node) -> Executed State($nodeState)")
247
248             // If End Node, Send End Message
249             if (graph.isEndNode(node)) {
250                 // Close the current channel
251                 channel.close()
252             } else {
253                 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
254                 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
255                 // Process Skip Edges
256                 skippingEdges.forEach { skippingEdge ->
257                     // Prepare next node ready message and Send NodeReadyMessage
258                     val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
259                     sendNodeMessage(nodeReadyMessage)
260                 }
261                 // Process Success Node
262                 processNextNodes(node, nodeState)
263             }
264         }
265
266         fun skipNodeWorker(message: NodeSkipMessage<In, Out>) = launch {
267             val node = message.node
268             val incomingEdges = graph.incomingEdges(node.id)
269             // Check All Incoming Nodes Skipped
270             val nonSkippedEdges = incomingEdges.filter {
271                 it.status == EdgeStatus.NOT_STARTED
272             }
273             log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
274
275             if (nonSkippedEdges.isEmpty()) {
276                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
277                 // Call the Extension Function
278                 val deferredNodeState = CompletableDeferred<EdgeLabel>()
279                 skipNode(node, message.nodeInput, message.nodeOutput, deferredNodeState)
280                 val nodeState = deferredNodeState.await()
281                 log.info("Skip Node($node) -> Executed State($nodeState)")
282                 // Mark the Current node as Skipped
283                 node.status = NodeStatus.SKIPPED
284                 // Look for next possible skip nodes
285                 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
286                     val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
287                     sendNodeMessage(nodeReadyMessage)
288                 }
289             }
290         }
291
292         fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
293             TODO()
294         }
295
296         fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
297             channel.close()
298             throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
299         }
300
301         /** Process each actor message received based on type **/
302         consumeEach { nodeMessage ->
303             when (nodeMessage) {
304                 is NodeReadyMessage<In, Out> -> {
305                     // Blocking call
306                     readyNodeWorker(nodeMessage)
307                 }
308                 is NodeExecuteMessage<In, Out> -> {
309                     launch {
310                         executeNodeWorker(nodeMessage)
311                     }
312                 }
313                 is NodeSkipMessage<In, Out> -> {
314                     launch {
315                         skipNodeWorker(nodeMessage)
316                     }
317                 }
318                 is NodeRestartMessage<In, Out> -> {
319                     launch {
320                         restartNodeWorker(nodeMessage)
321                     }
322                 }
323             }
324         }
325     }
326
327     override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
328                                          input: In, output: CompletableDeferred<Out>) {
329         log.info("Executing Graph : $graph")
330         this.graph = graph
331         this.workflowId = bluePrintRuntimeService.id()
332         val startMessage = WorkflowExecuteMessage(input, output)
333         workflowActor.send(startMessage)
334     }
335 }