2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
19 import kotlinx.coroutines.CompletableDeferred
20 import kotlinx.coroutines.coroutineScope
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
24 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
25 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
26 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
27 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
28 import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
29 import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
30 import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
32 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
33 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
34 import org.onap.ccsdk.cds.controllerblueprints.core.isAcyclic
35 import org.onap.ccsdk.cds.controllerblueprints.core.logger
36 import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService
37 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
38 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
39 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
40 import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
41 import org.springframework.stereotype.Service
42 import kotlin.coroutines.CoroutineContext
44 @Service("imperativeWorkflowExecutionService")
45 class ImperativeWorkflowExecutionService(
46 private val nodeTemplateExecutionService: NodeTemplateExecutionService
48 BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
50 override suspend fun executeBluePrintWorkflow(
51 bluePrintRuntimeService: BluePrintRuntimeService<*>,
52 executionServiceInput: ExecutionServiceInput,
53 properties: MutableMap<String, Any>
54 ): ExecutionServiceOutput {
56 val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
58 val workflowName = executionServiceInput.actionIdentifiers.actionName
60 val graph = bluePrintContext.workflowByName(workflowName).asGraph()
62 if (!graph.isAcyclic()) {
63 throw BluePrintException("Imperative workflow must be acyclic. Check on_success/on_failure for circular references")
66 return coroutineScope {
67 ImperativeBluePrintWorkflowService(
68 nodeTemplateExecutionService,
69 this.coroutineContext[MDCContext]
71 }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
75 open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
76 AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
78 final override val coroutineContext: CoroutineContext
79 get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
81 val log = logger(ImperativeBluePrintWorkflowService::class)
83 lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
84 lateinit var executionServiceInput: ExecutionServiceInput
85 lateinit var workflowName: String
87 override suspend fun executeWorkflow(
89 bluePrintRuntimeService: BluePrintRuntimeService<*>,
90 input: ExecutionServiceInput
91 ): ExecutionServiceOutput {
93 this.bluePrintRuntimeService = bluePrintRuntimeService
94 this.executionServiceInput = input
95 this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
96 this.workflowId = bluePrintRuntimeService.id()
97 val output = CompletableDeferred<ExecutionServiceOutput>()
98 val startMessage = WorkflowExecuteMessage(input, output)
99 val workflowActor = workflowActor()
100 if (!workflowActor.isClosedForSend) {
101 workflowActor.send(startMessage)
103 throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
105 return output.await()
108 override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
109 return EdgeLabel.SUCCESS
112 override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
113 val wfStatus = Status().apply {
114 if (exceptions.isNotEmpty()) {
116 val errorMessage = it.message ?: ""
117 bluePrintRuntimeService.getBluePrintError().addError(errorMessage, "workflow")
118 log.error("workflow($workflowId) exception :", it)
120 message = BluePrintConstants.STATUS_FAILURE
122 message = BluePrintConstants.STATUS_SUCCESS
124 eventType = EventType.EVENT_COMPONENT_EXECUTED.name
126 return ExecutionServiceOutput().apply {
127 commonHeader = executionServiceInput.commonHeader
128 actionIdentifiers = executionServiceInput.actionIdentifiers
133 override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
134 NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
135 val nodeOutput = ExecutionServiceOutput().apply {
136 commonHeader = executionServiceInput.commonHeader
137 actionIdentifiers = executionServiceInput.actionIdentifiers
139 return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
142 override suspend fun prepareNodeSkipMessage(node: Graph.Node):
143 NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
144 val nodeOutput = ExecutionServiceOutput().apply {
145 commonHeader = executionServiceInput.commonHeader
146 actionIdentifiers = executionServiceInput.actionIdentifiers
148 return NodeSkipMessage(node, executionServiceInput, nodeOutput)
151 override suspend fun executeNode(
153 nodeInput: ExecutionServiceInput,
154 nodeOutput: ExecutionServiceOutput
156 log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})")
157 val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
158 checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
159 val nodeTemplateName = step.target!!
161 /** execute node template */
162 val executionServiceOutput = nodeTemplateExecutionService
163 .executeNodeTemplate(bluePrintRuntimeService, node.id, nodeTemplateName, nodeInput)
165 if (executionServiceOutput.status.message == BluePrintConstants.STATUS_FAILURE) {
166 // Clear step errors so that the workflow does not fail
167 bluePrintRuntimeService.getBluePrintError().stepErrors(node.id)?.clear()
168 return EdgeLabel.FAILURE
171 return EdgeLabel.SUCCESS
174 override suspend fun skipNode(
176 nodeInput: ExecutionServiceInput,
177 nodeOutput: ExecutionServiceOutput
179 return EdgeLabel.SUCCESS
182 override suspend fun cancelNode(
184 nodeInput: ExecutionServiceInput,
185 nodeOutput: ExecutionServiceOutput
187 TODO("not implemented")
190 override suspend fun restartNode(
192 nodeInput: ExecutionServiceInput,
193 nodeOutput: ExecutionServiceOutput
195 TODO("not implemented")