2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
30 interface BluePrintWorkFlowService<In, Out> {
32 /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33 * and workflow input [input]*/
34 suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>, input: In): Out
36 suspend fun initializeWorkflow(input: In): EdgeLabel
38 suspend fun prepareWorkflowOutput(): Out
40 /** Prepare the message for the Node */
41 suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
43 suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
45 suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
47 suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
49 suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
51 suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
55 /** Workflow Message Types */
56 sealed class WorkflowMessage<In, Out>
58 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
60 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
62 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
64 /** Node Message Types */
65 sealed class NodeMessage<In, Out>
67 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
69 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
71 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
73 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
75 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
77 enum class EdgeAction(val id: String) {
82 /** Abstract workflow service implementation */
83 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
85 lateinit var graph: Graph
87 private val log = logger(AbstractBluePrintWorkFlowService::class)
89 private val job = Job()
91 lateinit var workflowId: String
93 var exceptions: MutableList<Exception> = arrayListOf()
95 final override val coroutineContext: CoroutineContext
96 get() = job + CoroutineName("Wf")
99 log.info("Received workflow($workflowId) cancel request")
101 throw CancellationException("Workflow($workflowId) cancelled as requested")
104 suspend fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
105 /** Process the workflow execution message */
106 suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
108 val nodeActor = nodeActor()
109 // Prepare Workflow and Populate the Initial store
110 initializeWorkflow(workflowExecuteMessage.input)
112 val startNode = graph.startNodes().first()
113 // Prepare first node message and Send NodeExecuteMessage
114 // Start node doesn't wait for any nodes, so we can pass Execute message directly
115 val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
116 /** Send message from workflow actor to node actor */
118 nodeActor.send(nodeExecuteMessage)
120 // Wait for workflow completion or Error
121 nodeActor.invokeOnClose { exception ->
123 if (exception != null) exceptions.add(BluePrintProcessorException(exception))
124 log.info("workflow($workflowId) nodes completed with (${exceptions.size})exceptions")
125 val workflowOutput = prepareWorkflowOutput()
126 workflowExecuteMessage.output.complete(workflowOutput)
132 /** Process each actor message received based on type */
133 consumeEach { message ->
135 is WorkflowExecuteMessage<In, Out> -> {
138 executeMessageActor(message)
139 } catch (e: Exception) {
144 is WorkflowRestartMessage<In, Out> -> {
149 is WorkflowCancelMessage<In, Out> -> {
159 private suspend fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
161 /** Send message to process from one state to other state */
162 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
163 channel.send(nodeMessage)
166 /** Process the cascade node processing, based on the previous state of the node */
167 fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
168 // Process only Next Success Node
169 val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
170 if (stateEdges.isNotEmpty()) {
171 stateEdges.forEach { stateEdge ->
172 // Prepare next node ready message and Send NodeReadyMessage
173 val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
174 sendNodeMessage(nodeReadyMessage)
179 suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
180 val edge = message.fromEdge
181 val node = edge.target
182 // Check if current edge action is Skip or Execute
183 when (message.edgeAction) {
185 val skipMessage = prepareNodeSkipMessage(node)
186 sendNodeMessage(skipMessage)
188 EdgeAction.EXECUTE -> {
189 val nodeExecuteMessage = prepareNodeExecutionMessage(node)
190 sendNodeMessage(nodeExecuteMessage)
195 suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
196 val edge = message.fromEdge
197 val node = edge.target
198 log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
199 // Update the current incoming edge status to executed or skipped
200 when (message.edgeAction) {
201 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
202 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
204 val incomingEdges = graph.incomingEdges(node.id)
205 if (incomingEdges.size > 1) {
206 // Check all incoming edges executed or skipped
207 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
208 if (notCompletedEdges.isEmpty()) {
209 // Possibility of skip edge action performed at last, but other edges have execute action.
210 val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
211 val newMessage = if (executePresent.isNotEmpty()) {
212 NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
216 triggerToExecuteOrSkip(newMessage)
218 log.info("node(${node.id}) is waiting for incoming edges($notCompletedEdges)")
221 triggerToExecuteOrSkip(message)
225 suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
226 val node = message.node
227 node.status = NodeStatus.EXECUTING
228 val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
229 || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
232 log.debug("##### Processing workflow($workflowId) node($node) #####")
233 // Call the Extension function and get the next Edge state.
234 executeNode(node, message.nodeInput, message.nodeOutput)
236 // Update Node Completed
237 node.status = NodeStatus.EXECUTED
238 log.info("Execute node(${node.id}) -> executed state($nodeState)")
239 // Check if the Node status edge is there, If not close processing
240 val edgePresent = graph.outgoingEdges(node.id, nodeState).isNotEmpty()
242 // If End Node, Send End Message
243 if (graph.isEndNode(node)) {
244 // Close the current channel
246 } else if (!edgePresent) {
247 throw BluePrintProcessorException("node(${node.id}) outgoing edge($nodeState) is missing.")
249 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
250 log.debug("Skipping node($node)'s outgoing edges($skippingEdges)")
251 // Process Skip Edges
252 skippingEdges.forEach { skippingEdge ->
253 // Prepare next node ready message and Send NodeReadyMessage
254 val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
255 sendNodeMessage(nodeReadyMessage)
257 // Process Success Node
258 processNextNodes(node, nodeState)
262 suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
263 val node = message.node
264 val incomingEdges = graph.incomingEdges(node.id)
265 // Check All Incoming Nodes Skipped
266 val nonSkippedEdges = incomingEdges.filter {
267 it.status == EdgeStatus.NOT_STARTED
269 log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
271 if (nonSkippedEdges.isEmpty()) {
272 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
273 // Call the Extension Function
274 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
275 log.info("Skip node(${node.id}) -> executed state($nodeState)")
276 // Mark the Current node as Skipped
277 node.status = NodeStatus.SKIPPED
278 // Look for next possible skip nodes
279 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
280 val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
281 sendNodeMessage(nodeReadyMessage)
286 fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
290 fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
292 throw CancellationException("Workflow($workflowId) actor cancelled as requested.")
295 /** Process each actor message received based on type **/
296 consumeEach { nodeMessage ->
298 is NodeReadyMessage<In, Out> -> {
301 readyNodeWorker(nodeMessage)
302 } catch (e: Exception) {
307 is NodeExecuteMessage<In, Out> -> {
310 executeNodeWorker(nodeMessage)
311 } catch (e: Exception) {
312 nodeMessage.node.status = NodeStatus.TERMINATED
318 is NodeSkipMessage<In, Out> -> {
321 skipNodeWorker(nodeMessage)
322 } catch (e: Exception) {
323 nodeMessage.node.status = NodeStatus.TERMINATED
329 is NodeRestartMessage<In, Out> -> {
332 restartNodeWorker(nodeMessage)
333 } catch (e: Exception) {