2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
19 import kotlinx.coroutines.CancellationException
20 import kotlinx.coroutines.CompletableDeferred
21 import kotlinx.coroutines.CoroutineName
22 import kotlinx.coroutines.CoroutineScope
23 import kotlinx.coroutines.Job
24 import kotlinx.coroutines.channels.Channel
25 import kotlinx.coroutines.channels.actor
26 import kotlinx.coroutines.channels.consumeEach
27 import kotlinx.coroutines.launch
28 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
29 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
32 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
33 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
34 import org.onap.ccsdk.cds.controllerblueprints.core.incomingEdges
35 import org.onap.ccsdk.cds.controllerblueprints.core.isEndNode
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdges
38 import org.onap.ccsdk.cds.controllerblueprints.core.outgoingEdgesNotInLabels
39 import org.onap.ccsdk.cds.controllerblueprints.core.startNodes
40 import kotlin.coroutines.CoroutineContext
42 interface BluePrintWorkFlowService<In, Out> {
44 /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
45 * and workflow input [input]*/
46 suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
48 suspend fun initializeWorkflow(input: In): EdgeLabel
50 suspend fun prepareWorkflowOutput(): Out
52 /** Prepare the message for the Node */
53 suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
55 suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
57 suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
59 suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
61 suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
63 suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
66 /** Workflow Message Types */
67 sealed class WorkflowMessage<In, Out>
69 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
71 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
73 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
75 /** Node Message Types */
76 sealed class NodeMessage<In, Out>
78 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
80 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
82 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
84 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
86 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
88 enum class EdgeAction(val id: String) {
93 /** Abstract workflow service implementation */
94 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
96 lateinit var graph: Graph
98 private val log = logger(AbstractBluePrintWorkFlowService::class)
100 private val job = Job()
102 lateinit var workflowId: String
104 var exceptions: MutableList<Exception> = arrayListOf()
106 final override val coroutineContext: CoroutineContext
107 get() = job + CoroutineName("Wf")
110 log.info("Received workflow($workflowId) cancel request")
112 throw CancellationException("Workflow($workflowId) cancelled as requested")
115 suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
116 /** Process the workflow execution message */
117 suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
119 val nodeActor = nodeActor()
120 // Prepare Workflow and Populate the Initial store
121 initializeWorkflow(workflowExecuteMessage.input)
123 val startNode = graph.startNodes().first()
124 // Prepare first node message and Send NodeExecuteMessage
125 // Start node doesn't wait for any nodes, so we can pass Execute message directly
126 val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
127 /** Send message from workflow actor to node actor */
129 nodeActor.send(nodeExecuteMessage)
131 // Wait for workflow completion or Error
132 nodeActor.invokeOnClose { exception ->
134 if (exception != null) exceptions.add(BluePrintProcessorException(exception))
135 log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
136 val workflowOutput = prepareWorkflowOutput()
137 workflowExecuteMessage.output.complete(workflowOutput)
143 /** Process each actor message received based on type */
144 consumeEach { message ->
146 is WorkflowExecuteMessage<In, Out> -> {
149 executeMessageActor(message)
150 } catch (e: Exception) {
155 is WorkflowRestartMessage<In, Out> -> {
160 is WorkflowCancelMessage<In, Out> -> {
169 private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
171 /** Send message to process from one state to other state */
172 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
173 channel.send(nodeMessage)
176 /** Process the cascade node processing, based on the previous state of the node */
177 fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
178 // Process only Next Success Node
179 val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
180 if (stateEdges.isNotEmpty()) {
181 stateEdges.forEach { stateEdge ->
182 // Prepare next node ready message and Send NodeReadyMessage
183 val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
184 sendNodeMessage(nodeReadyMessage)
189 suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
190 val edge = message.fromEdge
191 val node = edge.target
192 // Check if current edge action is Skip or Execute
193 when (message.edgeAction) {
195 val skipMessage = prepareNodeSkipMessage(node)
196 sendNodeMessage(skipMessage)
198 EdgeAction.EXECUTE -> {
199 val nodeExecuteMessage = prepareNodeExecutionMessage(node)
200 sendNodeMessage(nodeExecuteMessage)
205 suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
206 val edge = message.fromEdge
207 val node = edge.target
208 log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
209 // Update the current incoming edge status to executed or skipped
210 when (message.edgeAction) {
211 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
212 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
214 val incomingEdges = graph.incomingEdges(node.id)
215 if (incomingEdges.size > 1) {
216 // Check all incoming edges executed or skipped
217 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
218 if (notCompletedEdges.isEmpty()) {
219 // Possibility of skip edge action performed at last, but other edges have execute action.
220 val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
221 val newMessage = if (executePresent.isNotEmpty()) {
222 NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
226 triggerToExecuteOrSkip(newMessage)
228 log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
231 triggerToExecuteOrSkip(message)
235 suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
236 val node = message.node
237 node.status = NodeStatus.EXECUTING
238 val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME ||
239 node.id == BluePrintConstants.GRAPH_END_NODE_NAME
243 log.debug("##### Processing workflow($workflowId) node($node) #####")
244 // Call the Extension function and get the next Edge state.
245 executeNode(node, message.nodeInput, message.nodeOutput)
247 // Update Node Completed
248 node.status = NodeStatus.EXECUTED
249 log.info("Execute node(${node.id}) -> executed state($nodeState)")
250 // Check if the Node status edge is there, If not close processing
251 val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
253 // If End Node, Send End Message
254 if (graph.isEndNode(node)) {
255 // Close the current channel
257 } else if (!edgePresent) {
258 throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
260 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
261 log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
262 // Process Skip Edges
263 skippingEdges.forEach { skippingEdge ->
264 // Prepare next node ready message and Send NodeReadyMessage
265 val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
266 sendNodeMessage(nodeReadyMessage)
268 // Process Success Node
269 processNextNodes(node, nodeState)
273 suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
274 val node = message.node
275 val incomingEdges = graph.incomingEdges(node.id)
276 // Check All Incoming Nodes Skipped
277 val nonSkippedEdges = incomingEdges.filter {
278 it.status == EdgeStatus.NOT_STARTED
280 log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
282 if (nonSkippedEdges.isEmpty()) {
283 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
284 // Call the Extension Function
285 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
286 log.info("Skip node(${node.id}) -> executed state($nodeState)")
287 // Mark the Current node as Skipped
288 node.status = NodeStatus.SKIPPED
289 // Look for next possible skip nodes
290 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
291 val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
292 sendNodeMessage(nodeReadyMessage)
297 fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
301 fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
303 throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
306 /** Process each actor message received based on type **/
307 consumeEach { nodeMessage ->
309 is NodeReadyMessage<In, Out> -> {
312 readyNodeWorker(nodeMessage)
313 } catch (e: Exception) {
318 is NodeExecuteMessage<In, Out> -> {
321 executeNodeWorker(nodeMessage)
322 } catch (e: Exception) {
323 nodeMessage.node.status = NodeStatus.TERMINATED
329 is NodeSkipMessage<In, Out> -> {
332 skipNodeWorker(nodeMessage)
333 } catch (e: Exception) {
334 nodeMessage.node.status = NodeStatus.TERMINATED
340 is NodeRestartMessage<In, Out> -> {
343 restartNodeWorker(nodeMessage)
344 } catch (e: Exception) {