Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / blueprints / blueprint-core / src / main / kotlin / org / onap / ccsdk / cds / controllerblueprints / core / service / BluePrintWorkflowService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import kotlinx.coroutines.CancellationException
20 import kotlinx.coroutines.CompletableDeferred
21 import kotlinx.coroutines.CoroutineName
22 import kotlinx.coroutines.CoroutineScope
23 import kotlinx.coroutines.Job
24 import kotlinx.coroutines.channels.Channel
25 import kotlinx.coroutines.channels.actor
26 import kotlinx.coroutines.channels.consumeEach
27 import kotlinx.coroutines.launch
28 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
29 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
32 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
33 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
34 import org.onap.ccsdk.cds.controllerblueprints.core.incomingEdges
35 import org.onap.ccsdk.cds.controllerblueprints.core.isEndNode
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdges
38 import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdgesNotInLabels
39 import org.onap.ccsdk.cds.controllerblueprints.core.startNodes
40 import kotlin.coroutines.CoroutineContext
41
42 interface BluePrintWorkFlowService<In, Out> {
43
44     /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
45      * and workflow input [input]*/
46     suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
47
48     suspend fun initializeWorkflow(input: In): EdgeLabel
49
50     suspend fun prepareWorkflowOutput(): Out
51
52     /** Prepare the message for the Node */
53     suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
54
55     suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
56
57     suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
58
59     suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
60
61     suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
62
63     suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
64 }
65
66 /** Workflow Message Types */
67 sealed class WorkflowMessage<In, Out>
68
69 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
70
71 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
72
73 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
74
75 /** Node Message Types */
76 sealed class NodeMessage<In, Out>
77
78 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
79
80 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
81
82 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
83
84 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
85
86 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
87
88 enum class EdgeAction(val id: String) {
89     EXECUTE("execute"),
90     SKIP("skip")
91 }
92
93 /** Abstract workflow service implementation */
94 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
95
96     lateinit var graph: Graph
97
98     private val log = logger(AbstractBluePrintWorkFlowService::class)
99
100     private val job = Job()
101
102     lateinit var workflowId: String
103
104     var exceptions: MutableList<Exception> = arrayListOf()
105
106     override val coroutineContext: CoroutineContext
107         get() = job + CoroutineName("Wf")
108
109     fun cancel() {
110         log.info("Received workflow($workflowId) cancel request")
111         job.cancel()
112         throw CancellationException("Workflow($workflowId) cancelled as requested")
113     }
114
115     suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
116         /** Process the workflow execution message */
117         suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
118
119             val nodeActor = nodeActor()
120             // Prepare Workflow and Populate the Initial store
121             initializeWorkflow(workflowExecuteMessage.input)
122
123             val startNode = graph.startNodes().first()
124             // Prepare first node message and Send NodeExecuteMessage
125             // Start node doesn't wait for any nodes, so we can pass Execute message directly
126             val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
127             /** Send message from workflow actor to node actor */
128             launch {
129                 nodeActor.send(nodeExecuteMessage)
130             }
131             // Wait for workflow completion or Error
132             nodeActor.invokeOnClose { exception ->
133                 launch {
134                     if (exception != null) exceptions.add(BluePrintProcessorException(exception))
135                     log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
136                     val workflowOutput = prepareWorkflowOutput()
137                     workflowExecuteMessage.output.complete(workflowOutput)
138                     channel.close()
139                 }
140             }
141         }
142
143         /** Process each actor message received based on type */
144         consumeEach { message ->
145             when (message) {
146                 is WorkflowExecuteMessage<In, Out> -> {
147                     launch {
148                         try {
149                             executeMessageActor(message)
150                         } catch (e: Exception) {
151                             exceptions.add(e)
152                         }
153                     }
154                 }
155                 is WorkflowRestartMessage<In, Out> -> {
156                     launch {
157                         TODO("")
158                     }
159                 }
160                 is WorkflowCancelMessage<In, Out> -> {
161                     launch {
162                         TODO("")
163                     }
164                 }
165             }
166         }
167     }
168
169     private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
170
171         /** Send message to process from one state to other state */
172         fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
173             channel.send(nodeMessage)
174         }
175
176         /** Process the cascade node processing, based on the previous state of the node */
177         fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
178             // Process only Next Success Node
179             val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
180             if (stateEdges.isNotEmpty()) {
181                 stateEdges.forEach { stateEdge ->
182                     // Prepare next node ready message and Send NodeReadyMessage
183                     val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
184                     sendNodeMessage(nodeReadyMessage)
185                 }
186             }
187         }
188
189         suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
190             val edge = message.fromEdge
191             val node = edge.target
192             // Check if current edge action is Skip or Execute
193             when (message.edgeAction) {
194                 EdgeAction.SKIP -> {
195                     val skipMessage = prepareNodeSkipMessage(node)
196                     sendNodeMessage(skipMessage)
197                 }
198                 EdgeAction.EXECUTE -> {
199                     val nodeExecuteMessage = prepareNodeExecutionMessage(node)
200                     sendNodeMessage(nodeExecuteMessage)
201                 }
202             }
203         }
204
205         suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
206             val edge = message.fromEdge
207             val node = edge.target
208             log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
209             // Update the current incoming edge status to executed or skipped
210             when (message.edgeAction) {
211                 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
212                 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
213             }
214             val incomingEdges = graph.incomingEdges(node.id)
215             if (incomingEdges.size > 1) {
216                 // Check all incoming edges executed or skipped
217                 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
218                 if (notCompletedEdges.isEmpty()) {
219                     // Possibility of skip edge action performed at last, but other edges have execute action.
220                     val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
221                     val newMessage = if (executePresent.isNotEmpty()) {
222                         NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
223                     } else {
224                         message
225                     }
226                     triggerToExecuteOrSkip(newMessage)
227                 } else {
228                     log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
229                 }
230             } else {
231                 triggerToExecuteOrSkip(message)
232             }
233         }
234
235         suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
236             val node = message.node
237             node.status = NodeStatus.EXECUTING
238             val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME ||
239                 node.id == BluePrintConstants.GRAPH_END_NODE_NAME
240             ) {
241                 EdgeLabel.SUCCESS
242             } else {
243                 log.debug("##### Processing workflow($workflowId) node($node) #####")
244                 // Call the Extension function and get the next Edge state.
245                 executeNode(node, message.nodeInput, message.nodeOutput)
246             }
247             // Update Node Completed
248             node.status = NodeStatus.EXECUTED
249             log.info("Execute node(${node.id}) -> executed state($nodeState)")
250             // Check if the Node status edge is there, If not close processing
251             val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
252
253             // If End Node, Send End Message
254             if (graph.isEndNode(node)) {
255                 // Close the current channel
256                 channel.close()
257             } else if (!edgePresent) {
258                 throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
259             } else {
260                 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
261                 log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
262                 // Process Skip Edges
263                 skippingEdges.forEach { skippingEdge ->
264                     // Prepare next node ready message and Send NodeReadyMessage
265                     val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
266                     sendNodeMessage(nodeReadyMessage)
267                 }
268                 // Process Success Node
269                 processNextNodes(node, nodeState)
270             }
271         }
272
273         suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
274             val node = message.node
275             val incomingEdges = graph.incomingEdges(node.id)
276             // Check All Incoming Nodes Skipped
277             val nonSkippedEdges = incomingEdges.filter {
278                 it.status == EdgeStatus.NOT_STARTED
279             }
280             log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
281
282             if (nonSkippedEdges.isEmpty()) {
283                 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
284                 // Call the Extension Function
285                 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
286                 log.info("Skip node(${node.id}) -> executed state($nodeState)")
287                 // Mark the Current node as Skipped
288                 node.status = NodeStatus.SKIPPED
289                 // Look for next possible skip nodes
290                 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
291                     val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
292                     sendNodeMessage(nodeReadyMessage)
293                 }
294             }
295         }
296
297         fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
298             TODO()
299         }
300
301         fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
302             channel.close()
303             throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
304         }
305
306         /** Process each actor message received based on type **/
307         consumeEach { nodeMessage ->
308             when (nodeMessage) {
309                 is NodeReadyMessage<In, Out> -> {
310                     // Blocking call
311                     try {
312                         readyNodeWorker(nodeMessage)
313                     } catch (e: Exception) {
314                         exceptions.add(e)
315                         channel.close()
316                     }
317                 }
318                 is NodeExecuteMessage<In, Out> -> {
319                     launch {
320                         try {
321                             executeNodeWorker(nodeMessage)
322                         } catch (e: Exception) {
323                             nodeMessage.node.status = NodeStatus.TERMINATED
324                             exceptions.add(e)
325                             channel.close()
326                         }
327                     }
328                 }
329                 is NodeSkipMessage<In, Out> -> {
330                     launch {
331                         try {
332                             skipNodeWorker(nodeMessage)
333                         } catch (e: Exception) {
334                             nodeMessage.node.status = NodeStatus.TERMINATED
335                             exceptions.add(e)
336                             channel.close()
337                         }
338                     }
339                 }
340                 is NodeRestartMessage<In, Out> -> {
341                     launch {
342                         try {
343                             restartNodeWorker(nodeMessage)
344                         } catch (e: Exception) {
345                             exceptions.add(e)
346                             channel.close()
347                         }
348                     }
349                 }
350             }
351         }
352     }
353 }