9051502130823bc197d1f00300c1913904066fc8
[ccsdk/cds.git] /
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
29
30 interface BluePrintWorkFlowService<In, Out> {
31
32     /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33      * and workflow input [input], response will be retrieve from output [output]*/
34     suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
35                                 input: In, output: CompletableDeferred<Out>)
36
37     suspend fun initializeWorkflow(input: In): EdgeLabel
38
39     suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
40
41     /** Prepare the message for the Node */
42     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
43
44     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
45
46     suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
47
48     suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
49
50     suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
51
52     suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
53
54 }
55
56 /** Workflow Message Types */
57 sealed class WorkflowMessage<In, Out>
58
59 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
60
61 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
62
63 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
64
65 /** Node Message Types */
66 sealed class NodeMessage<In, Out>
67
68 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
69
70 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
71
72 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
73
74 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
75
76 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
77
78 enum class EdgeAction(val id: String) {
79     EXECUTE("execute"),
80     SKIP("skip")
81 }
82
83 /** Abstract workflow service implementation */
84 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
85
86     lateinit var graph: Graph
87
88     private val log = logger(AbstractBluePrintWorkFlowService::class)
89
90     private val job = Job()
91
92     lateinit var workflowId: String
93
94     final override val coroutineContext: CoroutineContext
95         get() = job + CoroutineName("Wf")
96
97     fun cancel() {
98         log.info("Received workflow($workflowId) cancel request")
99         job.cancel()
100         throw CancellationException("Workflow($workflowId) cancelled as requested")
101     }
102
103     fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
104         /** Process the workflow execution message */
105         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
106
107             val nodeActor = nodeActor()
108             // Prepare Workflow and Populate the Initial store
109             initializeWorkflow(workflowExecuteMessage.input)
110
111             val startNode = graph.startNodes().first()
112             // Prepare first node message and Send NodeExecuteMessage
113             // Start node doesn't wait for any nodes, so we can pass Execute message directly
114             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
115             /** Send message from workflow actor to node actor */
116             launch {
117                 nodeActor.send(nodeExecuteMessage)
118             }
119             // Wait for workflow completion or Error
120             nodeActor.invokeOnClose { exception ->
121                 launch {
122                     log.info("End Node Completed, processing completion message")
123                     val bluePrintProcessorException: BluePrintProcessorException? =
124                             if (exception != null) BluePrintProcessorException(exception) else null
125
126                     val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
127                     workflowExecuteMessage.output.complete(workflowOutput)
128                     channel.close(exception)
129                 }
130             }
131         }
132
133         /** Process each actor message received based on type */
134         consumeEach { message ->
135             when (message) {
136                 is WorkflowExecuteMessage<In, Out> -> {
137                     launch {
138                         executeMessageActor(message)
139                     }
140                 }
141                 is WorkflowRestartMessage<In, Out> -> {
142                     launch {
143                         TODO("")
144                     }
145                 }
146                 is WorkflowCancelMessage<In, Out> -> {
147                     launch {
148                         TODO("")
149                     }
150                 }
151             }
152         }
153     }
154
155
156     private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
157
158         /** Send message to process from one state to other state */
159         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
160             channel.send(nodeMessage)
161         }
162
163         /** Process the cascade node processing, based on the previous state of the node */
164         fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
165             // Process only Next Success Node
166             val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
167             log.debug("Next Edges :$stateEdges")
168             if (stateEdges.isNotEmpty()) {
169                 stateEdges.forEach { stateEdge ->
170                     // Prepare next node ready message and Send NodeReadyMessage
171                     val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
172                     sendNodeMessage(nodeReadyMessage)
173                 }
174             }
175         }
176
177         suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
178             val edge = message.fromEdge
179             val node = edge.target
180             // Check if current edge action is Skip or Execute
181             when (message.edgeAction) {
182                 EdgeAction.SKIP -> {
183                     val skipMessage = prepareNodeSkipMessage(node)
184                     sendNodeMessage(skipMessage)
185                 }
186                 EdgeAction.EXECUTE -> {
187                     val nodeExecuteMessage = prepareNodeExecutionMessage(node)
188                     sendNodeMessage(nodeExecuteMessage)
189                 }
190             }
191         }
192
193         suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
194             val edge = message.fromEdge
195             val node = edge.target
196             log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
197             // Update the current incoming edge status to executed or skipped
198             when (message.edgeAction) {
199                 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
200                 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
201             }
202             val incomingEdges = graph.incomingEdges(node.id)
203             if (incomingEdges.size > 1) {
204                 // Check all incoming edges executed or skipped
205                 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
206                 if (notCompletedEdges.isEmpty()) {
207                     // Possibility of skip edge action performed at last, but other edges have execute action.
208                     val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
209                     val newMessage = if (executePresent.isNotEmpty()) {
210                         NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
211                     } else {
212                         message
213                     }
214                     triggerToExecuteOrSkip(newMessage)
215                 } else {
216                     log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
217                 }
218             } else {
219                 triggerToExecuteOrSkip(message)
220             }
221         }
222
223         suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
224             val node = message.node
225             node.status = NodeStatus.EXECUTING
226             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
227                     || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
228                 EdgeLabel.SUCCESS
229             } else {
230                 log.debug("##### Processing workflow($workflowId) node($node) #####")
231                 // Call the Extension function and get the next Edge state.
232                 executeNode(node, message.nodeInput, message.nodeOutput)
233             }
234             // Update Node Completed
235             node.status = NodeStatus.EXECUTED
236             log.info("Execute Node($node) -> Executed State($nodeState)")
237
238             // If End Node, Send End Message
239             if (graph.isEndNode(node)) {
240                 // Close the current channel
241                 channel.close()
242             } else {
243                 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
244                 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
245                 // Process Skip Edges
246                 skippingEdges.forEach { skippingEdge ->
247                     // Prepare next node ready message and Send NodeReadyMessage
248                     val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
249                     sendNodeMessage(nodeReadyMessage)
250                 }
251                 // Process Success Node
252                 processNextNodes(node, nodeState)
253             }
254         }
255
256         suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
257             val node = message.node
258             val incomingEdges = graph.incomingEdges(node.id)
259             // Check All Incoming Nodes Skipped
260             val nonSkippedEdges = incomingEdges.filter {
261                 it.status == EdgeStatus.NOT_STARTED
262             }
263             log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
264
265             if (nonSkippedEdges.isEmpty()) {
266                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
267                 // Call the Extension Function
268                 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
269                 log.info("Skip Node($node) -> Executed State($nodeState)")
270                 // Mark the Current node as Skipped
271                 node.status = NodeStatus.SKIPPED
272                 // Look for next possible skip nodes
273                 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
274                     val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
275                     sendNodeMessage(nodeReadyMessage)
276                 }
277             }
278         }
279
280         fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
281             TODO()
282         }
283
284         fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
285             channel.close()
286             throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
287         }
288
289         /** Process each actor message received based on type **/
290         consumeEach { nodeMessage ->
291             when (nodeMessage) {
292                 is NodeReadyMessage<In, Out> -> {
293                     // Blocking call
294                     try {
295                         readyNodeWorker(nodeMessage)
296                     } catch (e: Exception) {
297                         channel.close(e)
298                     }
299                 }
300                 is NodeExecuteMessage<In, Out> -> {
301                     launch {
302                         try {
303                             executeNodeWorker(nodeMessage)
304                         } catch (e: Exception) {
305                             channel.close(e)
306                         }
307                     }
308                 }
309                 is NodeSkipMessage<In, Out> -> {
310                     launch {
311                         try {
312                             skipNodeWorker(nodeMessage)
313                         } catch (e: Exception) {
314                             channel.close(e)
315                         }
316                     }
317                 }
318                 is NodeRestartMessage<In, Out> -> {
319                     launch {
320                         try {
321                             restartNodeWorker(nodeMessage)
322                         } catch (e: Exception) {
323                             channel.close(e)
324                         }
325                     }
326                 }
327             }
328         }
329     }
330
331     override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
332                                          input: In, output: CompletableDeferred<Out>) {
333         log.info("Executing Graph : $graph")
334         this.graph = graph
335         this.workflowId = bluePrintRuntimeService.id()
336         val startMessage = WorkflowExecuteMessage(input, output)
337         workflowActor().send(startMessage)
338     }
339 }