5cec3c9479a6bb7568cc120605d5453eb56386e8
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
29
30 interface BluePrintWorkFlowService<In, Out> {
31
32     /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33      * and workflow input [input]*/
34     suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
35
36     suspend fun initializeWorkflow(input: In): EdgeLabel
37
38     suspend fun prepareWorkflowOutput(): Out
39
40     /** Prepare the message for the Node */
41     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
42
43     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
44
45     suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
46
47     suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
48
49     suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
50
51     suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
52
53 }
54
55 /** Workflow Message Types */
56 sealed class WorkflowMessage<In, Out>
57
58 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
59
60 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
61
62 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
63
64 /** Node Message Types */
65 sealed class NodeMessage<In, Out>
66
67 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
68
69 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
70
71 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
72
73 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
74
75 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
76
77 enum class EdgeAction(val id: String) {
78     EXECUTE("execute"),
79     SKIP("skip")
80 }
81
82 /** Abstract workflow service implementation */
83 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
84
85     lateinit var graph: Graph
86
87     private val log = logger(AbstractBluePrintWorkFlowService::class)
88
89     private val job = Job()
90
91     lateinit var workflowId: String
92
93     var exceptions: MutableList<Exception> = arrayListOf()
94
95     final override val coroutineContext: CoroutineContext
96         get() = job + CoroutineName("Wf")
97
98     fun cancel() {
99         log.info("Received workflow($workflowId) cancel request")
100         job.cancel()
101         throw CancellationException("Workflow($workflowId) cancelled as requested")
102     }
103
104     suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
105         /** Process the workflow execution message */
106         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
107
108             val nodeActor = nodeActor()
109             // Prepare Workflow and Populate the Initial store
110             initializeWorkflow(workflowExecuteMessage.input)
111
112             val startNode = graph.startNodes().first()
113             // Prepare first node message and Send NodeExecuteMessage
114             // Start node doesn't wait for any nodes, so we can pass Execute message directly
115             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
116             /** Send message from workflow actor to node actor */
117             launch {
118                 nodeActor.send(nodeExecuteMessage)
119             }
120             // Wait for workflow completion or Error
121             nodeActor.invokeOnClose { exception ->
122                 launch {
123                     if (exception != null) exceptions.add(BluePrintProcessorException(exception))
124                     log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
125                     val workflowOutput = prepareWorkflowOutput()
126                     workflowExecuteMessage.output.complete(workflowOutput)
127                     channel.close()
128                 }
129             }
130         }
131
132         /** Process each actor message received based on type */
133         consumeEach { message ->
134             when (message) {
135                 is WorkflowExecuteMessage<In, Out> -> {
136                     launch {
137                         try {
138                             executeMessageActor(message)
139                         } catch (e: Exception) {
140                             exceptions.add(e)
141                         }
142                     }
143                 }
144                 is WorkflowRestartMessage<In, Out> -> {
145                     launch {
146                         TODO("")
147                     }
148                 }
149                 is WorkflowCancelMessage<In, Out> -> {
150                     launch {
151                         TODO("")
152                     }
153                 }
154             }
155         }
156     }
157
158
159     private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
160
161         /** Send message to process from one state to other state */
162         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
163             channel.send(nodeMessage)
164         }
165
166         /** Process the cascade node processing, based on the previous state of the node */
167         fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
168             // Process only Next Success Node
169             val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
170             if (stateEdges.isNotEmpty()) {
171                 stateEdges.forEach { stateEdge ->
172                     // Prepare next node ready message and Send NodeReadyMessage
173                     val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
174                     sendNodeMessage(nodeReadyMessage)
175                 }
176             }
177         }
178
179         suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
180             val edge = message.fromEdge
181             val node = edge.target
182             // Check if current edge action is Skip or Execute
183             when (message.edgeAction) {
184                 EdgeAction.SKIP -> {
185                     val skipMessage = prepareNodeSkipMessage(node)
186                     sendNodeMessage(skipMessage)
187                 }
188                 EdgeAction.EXECUTE -> {
189                     val nodeExecuteMessage = prepareNodeExecutionMessage(node)
190                     sendNodeMessage(nodeExecuteMessage)
191                 }
192             }
193         }
194
195         suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
196             val edge = message.fromEdge
197             val node = edge.target
198             log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
199             // Update the current incoming edge status to executed or skipped
200             when (message.edgeAction) {
201                 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
202                 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
203             }
204             val incomingEdges = graph.incomingEdges(node.id)
205             if (incomingEdges.size > 1) {
206                 // Check all incoming edges executed or skipped
207                 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
208                 if (notCompletedEdges.isEmpty()) {
209                     // Possibility of skip edge action performed at last, but other edges have execute action.
210                     val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
211                     val newMessage = if (executePresent.isNotEmpty()) {
212                         NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
213                     } else {
214                         message
215                     }
216                     triggerToExecuteOrSkip(newMessage)
217                 } else {
218                     log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
219                 }
220             } else {
221                 triggerToExecuteOrSkip(message)
222             }
223         }
224
225         suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
226             val node = message.node
227             node.status = NodeStatus.EXECUTING
228             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
229                     || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
230                 EdgeLabel.SUCCESS
231             } else {
232                 log.debug("##### Processing workflow($workflowId) node($node) #####")
233                 // Call the Extension function and get the next Edge state.
234                 executeNode(node, message.nodeInput, message.nodeOutput)
235             }
236             // Update Node Completed
237             node.status = NodeStatus.EXECUTED
238             log.info("Execute node(${node.id}) -> executed state($nodeState)")
239             // Check if the Node status edge is there, If not close processing
240             val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
241
242             // If End Node, Send End Message
243             if (graph.isEndNode(node)) {
244                 // Close the current channel
245                 channel.close()
246             } else if (!edgePresent) {
247                 throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
248             } else {
249                 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
250                 log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
251                 // Process Skip Edges
252                 skippingEdges.forEach { skippingEdge ->
253                     // Prepare next node ready message and Send NodeReadyMessage
254                     val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
255                     sendNodeMessage(nodeReadyMessage)
256                 }
257                 // Process Success Node
258                 processNextNodes(node, nodeState)
259             }
260         }
261
262         suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
263             val node = message.node
264             val incomingEdges = graph.incomingEdges(node.id)
265             // Check All Incoming Nodes Skipped
266             val nonSkippedEdges = incomingEdges.filter {
267                 it.status == EdgeStatus.NOT_STARTED
268             }
269             log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
270
271             if (nonSkippedEdges.isEmpty()) {
272                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
273                 // Call the Extension Function
274                 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
275                 log.info("Skip node(${node.id}) -> executed state($nodeState)")
276                 // Mark the Current node as Skipped
277                 node.status = NodeStatus.SKIPPED
278                 // Look for next possible skip nodes
279                 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
280                     val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
281                     sendNodeMessage(nodeReadyMessage)
282                 }
283             }
284         }
285
286         fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
287             TODO()
288         }
289
290         fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
291             channel.close()
292             throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
293         }
294
295         /** Process each actor message received based on type **/
296         consumeEach { nodeMessage ->
297             when (nodeMessage) {
298                 is NodeReadyMessage<In, Out> -> {
299                     // Blocking call
300                     try {
301                         readyNodeWorker(nodeMessage)
302                     } catch (e: Exception) {
303                         exceptions.add(e)
304                         channel.close()
305                     }
306                 }
307                 is NodeExecuteMessage<In, Out> -> {
308                     launch {
309                         try {
310                             executeNodeWorker(nodeMessage)
311                         } catch (e: Exception) {
312                             nodeMessage.node.status = NodeStatus.TERMINATED
313                             exceptions.add(e)
314                             channel.close()
315                         }
316                     }
317                 }
318                 is NodeSkipMessage<In, Out> -> {
319                     launch {
320                         try {
321                             skipNodeWorker(nodeMessage)
322                         } catch (e: Exception) {
323                             nodeMessage.node.status = NodeStatus.TERMINATED
324                             exceptions.add(e)
325                             channel.close()
326                         }
327                     }
328                 }
329                 is NodeRestartMessage<In, Out> -> {
330                     launch {
331                         try {
332                             restartNodeWorker(nodeMessage)
333                         } catch (e: Exception) {
334                             exceptions.add(e)
335                             channel.close()
336                         }
337                     }
338                 }
339             }
340         }
341     }
342 }