2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
19 import kotlinx.coroutines.*
20 import kotlinx.coroutines.channels.Channel
21 import kotlinx.coroutines.channels.actor
22 import kotlinx.coroutines.channels.consumeEach
23 import org.onap.ccsdk.cds.controllerblueprints.core.*
24 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
25 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeStatus
26 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
27 import org.onap.ccsdk.cds.controllerblueprints.core.data.NodeStatus
28 import kotlin.coroutines.CoroutineContext
30 interface BluePrintWorkFlowService<In, Out> {
32 /** Executes imperative workflow graph [graph] for the bluePrintRuntimeService [bluePrintRuntimeService]
33 * and workflow input [input], response will be retrieve from output [output]*/
34 suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
35 input: In, output: CompletableDeferred<Out>)
37 suspend fun initializeWorkflow(input: In): EdgeLabel
39 suspend fun prepareWorkflowOutput(exception: BluePrintProcessorException?): Out
41 /** Prepare the message for the Node */
42 suspend fun prepareNodeExecutionMessage(node: Graph.Node): NodeExecuteMessage<In, Out>
44 suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<In, Out>
46 suspend fun executeNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
48 suspend fun skipNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
50 suspend fun cancelNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
52 suspend fun restartNode(node: Graph.Node, nodeInput: In, nodeOutput: Out): EdgeLabel
56 /** Workflow Message Types */
57 sealed class WorkflowMessage<In, Out>
59 class WorkflowExecuteMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
61 class WorkflowCancelMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
63 class WorkflowRestartMessage<In, Out>(val input: In, val output: CompletableDeferred<Out>) : WorkflowMessage<In, Out>()
65 /** Node Message Types */
66 sealed class NodeMessage<In, Out>
68 class NodeReadyMessage<In, Out>(val fromEdge: Graph.Edge, val edgeAction: EdgeAction) : NodeMessage<In, Out>()
70 class NodeExecuteMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
72 class NodeRestartMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
74 class NodeSkipMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
76 class NodeCancelMessage<In, Out>(val node: Graph.Node, val nodeInput: In, val nodeOutput: Out) : NodeMessage<In, Out>()
78 enum class EdgeAction(val id: String) {
83 /** Abstract workflow service implementation */
84 abstract class AbstractBluePrintWorkFlowService<In, Out> : CoroutineScope, BluePrintWorkFlowService<In, Out> {
86 lateinit var graph: Graph
88 private val log = logger(AbstractBluePrintWorkFlowService::class)
90 private val job = Job()
92 lateinit var workflowId: String
94 final override val coroutineContext: CoroutineContext
95 get() = job + CoroutineName("Wf")
98 log.info("Received workflow($workflowId) cancel request")
100 throw CancellationException("Workflow($workflowId) cancelled as requested")
103 fun workflowActor() = actor<WorkflowMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
104 /** Process the workflow execution message */
105 suspend fun executeMessageActor(workflowExecuteMessage: WorkflowExecuteMessage<In, Out>) {
107 val nodeActor = nodeActor()
108 // Prepare Workflow and Populate the Initial store
109 initializeWorkflow(workflowExecuteMessage.input)
111 val startNode = graph.startNodes().first()
112 // Prepare first node message and Send NodeExecuteMessage
113 // Start node doesn't wait for any nodes, so we can pass Execute message directly
114 val nodeExecuteMessage = prepareNodeExecutionMessage(startNode)
115 /** Send message from workflow actor to node actor */
117 nodeActor.send(nodeExecuteMessage)
119 // Wait for workflow completion or Error
120 nodeActor.invokeOnClose { exception ->
122 log.info("End Node Completed, processing completion message")
123 val bluePrintProcessorException: BluePrintProcessorException? =
124 if (exception != null) BluePrintProcessorException(exception) else null
126 val workflowOutput = prepareWorkflowOutput(bluePrintProcessorException)
127 workflowExecuteMessage.output.complete(workflowOutput)
128 channel.close(exception)
133 /** Process each actor message received based on type */
134 consumeEach { message ->
136 is WorkflowExecuteMessage<In, Out> -> {
138 executeMessageActor(message)
141 is WorkflowRestartMessage<In, Out> -> {
146 is WorkflowCancelMessage<In, Out> -> {
156 private fun nodeActor() = actor<NodeMessage<In, Out>>(coroutineContext, Channel.UNLIMITED) {
158 /** Send message to process from one state to other state */
159 fun sendNodeMessage(nodeMessage: NodeMessage<In, Out>) = launch {
160 channel.send(nodeMessage)
163 /** Process the cascade node processing, based on the previous state of the node */
164 fun processNextNodes(node: Graph.Node, nodeState: EdgeLabel) {
165 // Process only Next Success Node
166 val stateEdges = graph.outgoingEdges(node.id, arrayListOf(nodeState))
167 log.debug("Next Edges :$stateEdges")
168 if (stateEdges.isNotEmpty()) {
169 stateEdges.forEach { stateEdge ->
170 // Prepare next node ready message and Send NodeReadyMessage
171 val nodeReadyMessage = NodeReadyMessage<In, Out>(stateEdge, EdgeAction.EXECUTE)
172 sendNodeMessage(nodeReadyMessage)
177 suspend fun triggerToExecuteOrSkip(message: NodeReadyMessage<In, Out>) {
178 val edge = message.fromEdge
179 val node = edge.target
180 // Check if current edge action is Skip or Execute
181 when (message.edgeAction) {
183 val skipMessage = prepareNodeSkipMessage(node)
184 sendNodeMessage(skipMessage)
186 EdgeAction.EXECUTE -> {
187 val nodeExecuteMessage = prepareNodeExecutionMessage(node)
188 sendNodeMessage(nodeExecuteMessage)
193 suspend fun readyNodeWorker(message: NodeReadyMessage<In, Out>) {
194 val edge = message.fromEdge
195 val node = edge.target
196 log.debug("@@@@@ Ready workflow($workflowId), node($node) from edge($edge) for action(${message.edgeAction}) @@@@@")
197 // Update the current incoming edge status to executed or skipped
198 when (message.edgeAction) {
199 EdgeAction.SKIP -> message.fromEdge.status = EdgeStatus.SKIPPED
200 EdgeAction.EXECUTE -> message.fromEdge.status = EdgeStatus.EXECUTED
202 val incomingEdges = graph.incomingEdges(node.id)
203 if (incomingEdges.size > 1) {
204 // Check all incoming edges executed or skipped
205 val notCompletedEdges = incomingEdges.filter { it.status == EdgeStatus.NOT_STARTED }
206 if (notCompletedEdges.isEmpty()) {
207 // Possibility of skip edge action performed at last, but other edges have execute action.
208 val executePresent = incomingEdges.filter { it.status == EdgeStatus.EXECUTED }
209 val newMessage = if (executePresent.isNotEmpty()) {
210 NodeReadyMessage(message.fromEdge, EdgeAction.EXECUTE)
214 triggerToExecuteOrSkip(newMessage)
216 log.info("node(${node.id}) waiting for not completed edges($notCompletedEdges)")
219 triggerToExecuteOrSkip(message)
223 suspend fun executeNodeWorker(message: NodeExecuteMessage<In, Out>) {
224 val node = message.node
225 node.status = NodeStatus.EXECUTING
226 val nodeState = if (node.id == BluePrintConstants.GRAPH_START_NODE_NAME
227 || node.id == BluePrintConstants.GRAPH_END_NODE_NAME) {
230 log.debug("##### Processing workflow($workflowId) node($node) #####")
231 // Call the Extension function and get the next Edge state.
232 executeNode(node, message.nodeInput, message.nodeOutput)
234 // Update Node Completed
235 node.status = NodeStatus.EXECUTED
236 log.info("Execute Node($node) -> Executed State($nodeState)")
238 // If End Node, Send End Message
239 if (graph.isEndNode(node)) {
240 // Close the current channel
243 val skippingEdges = graph.outgoingEdgesNotInLabels(node.id, arrayListOf(nodeState))
244 log.debug("Skipping node($node) outgoing Edges($skippingEdges)")
245 // Process Skip Edges
246 skippingEdges.forEach { skippingEdge ->
247 // Prepare next node ready message and Send NodeReadyMessage
248 val nodeReadyMessage = NodeReadyMessage<In, Out>(skippingEdge, EdgeAction.SKIP)
249 sendNodeMessage(nodeReadyMessage)
251 // Process Success Node
252 processNextNodes(node, nodeState)
256 suspend fun skipNodeWorker(message: NodeSkipMessage<In, Out>) {
257 val node = message.node
258 val incomingEdges = graph.incomingEdges(node.id)
259 // Check All Incoming Nodes Skipped
260 val nonSkippedEdges = incomingEdges.filter {
261 it.status == EdgeStatus.NOT_STARTED
263 log.debug("Node($node) incoming edges ($incomingEdges), not skipped incoming edges ($nonSkippedEdges)")
265 if (nonSkippedEdges.isEmpty()) {
266 log.debug("$$$$$ Skipping workflow($workflowId) node($node) $$$$$")
267 // Call the Extension Function
268 val nodeState = skipNode(node, message.nodeInput, message.nodeOutput)
269 log.info("Skip Node($node) -> Executed State($nodeState)")
270 // Mark the Current node as Skipped
271 node.status = NodeStatus.SKIPPED
272 // Look for next possible skip nodes
273 graph.outgoingEdges(node.id).forEach { outgoingEdge ->
274 val nodeReadyMessage = NodeReadyMessage<In, Out>(outgoingEdge, EdgeAction.SKIP)
275 sendNodeMessage(nodeReadyMessage)
280 fun restartNodeWorker(message: NodeRestartMessage<In, Out>) = launch {
284 fun cancelNodeWorker(messageWorkflow: WorkflowCancelMessage<In, Out>) = launch {
286 throw CancellationException("Workflow($workflowId) actor cancelled as requested ...")
289 /** Process each actor message received based on type **/
290 consumeEach { nodeMessage ->
292 is NodeReadyMessage<In, Out> -> {
295 readyNodeWorker(nodeMessage)
296 } catch (e: Exception) {
300 is NodeExecuteMessage<In, Out> -> {
303 executeNodeWorker(nodeMessage)
304 } catch (e: Exception) {
309 is NodeSkipMessage<In, Out> -> {
312 skipNodeWorker(nodeMessage)
313 } catch (e: Exception) {
318 is NodeRestartMessage<In, Out> -> {
321 restartNodeWorker(nodeMessage)
322 } catch (e: Exception) {
331 override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BluePrintRuntimeService<*>,
332 input: In, output: CompletableDeferred<Out>) {
333 log.info("Executing Graph : $graph")
335 this.workflowId = bluePrintRuntimeService.id()
336 val startMessage = WorkflowExecuteMessage(input, output)
337 workflowActor().send(startMessage)