2 * Copyright © 2019 IBM.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
19 import kotlinx.coroutines.CompletableDeferred
20 import kotlinx.coroutines.coroutineScope
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
24 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
25 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
26 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
27 import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
28 import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
29 import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
30 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
32 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBlueprintWorkFlowService
35 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintRuntimeService
36 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
37 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
38 import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
39 import org.springframework.stereotype.Service
40 import kotlin.coroutines.CoroutineContext
42 @Service("imperativeWorkflowExecutionService")
43 class ImperativeWorkflowExecutionService(
44 private val nodeTemplateExecutionService: NodeTemplateExecutionService
46 BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
48 override suspend fun executeBlueprintWorkflow(
49 bluePrintRuntimeService: BlueprintRuntimeService<*>,
50 executionServiceInput: ExecutionServiceInput,
51 properties: MutableMap<String, Any>
52 ): ExecutionServiceOutput {
54 val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
56 val workflowName = executionServiceInput.actionIdentifiers.actionName
58 val graph = bluePrintContext.workflowByName(workflowName).asGraph()
60 return coroutineScope {
61 ImperativeBlueprintWorkflowService(
62 nodeTemplateExecutionService,
63 this.coroutineContext[MDCContext]
65 }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
69 open class ImperativeBlueprintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
70 AbstractBlueprintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
72 final override val coroutineContext: CoroutineContext
73 get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
75 val log = logger(ImperativeBlueprintWorkflowService::class)
77 lateinit var bluePrintRuntimeService: BlueprintRuntimeService<*>
78 lateinit var executionServiceInput: ExecutionServiceInput
79 lateinit var workflowName: String
81 override suspend fun executeWorkflow(
83 bluePrintRuntimeService: BlueprintRuntimeService<*>,
84 input: ExecutionServiceInput
85 ): ExecutionServiceOutput {
87 this.bluePrintRuntimeService = bluePrintRuntimeService
88 this.executionServiceInput = input
89 this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
90 this.workflowId = bluePrintRuntimeService.id()
91 val output = CompletableDeferred<ExecutionServiceOutput>()
92 val startMessage = WorkflowExecuteMessage(input, output)
93 val workflowActor = workflowActor()
94 if (!workflowActor.isClosedForSend) {
95 workflowActor.send(startMessage)
97 throw BlueprintProcessorException("workflow($workflowActor) actor is closed")
102 override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
103 return EdgeLabel.SUCCESS
106 override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
107 val wfStatus = Status().apply {
108 if (exceptions.isNotEmpty()) {
110 val errorMessage = it.message ?: ""
111 bluePrintRuntimeService.getBlueprintError().addError(errorMessage)
112 log.error("workflow($workflowId) exception :", it)
114 message = BlueprintConstants.STATUS_FAILURE
116 message = BlueprintConstants.STATUS_SUCCESS
118 eventType = EventType.EVENT_COMPONENT_EXECUTED.name
120 return ExecutionServiceOutput().apply {
121 commonHeader = executionServiceInput.commonHeader
122 actionIdentifiers = executionServiceInput.actionIdentifiers
127 override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
128 NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
129 val nodeOutput = ExecutionServiceOutput().apply {
130 commonHeader = executionServiceInput.commonHeader
131 actionIdentifiers = executionServiceInput.actionIdentifiers
133 return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
136 override suspend fun prepareNodeSkipMessage(node: Graph.Node):
137 NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
138 val nodeOutput = ExecutionServiceOutput().apply {
139 commonHeader = executionServiceInput.commonHeader
140 actionIdentifiers = executionServiceInput.actionIdentifiers
142 return NodeSkipMessage(node, executionServiceInput, nodeOutput)
145 override suspend fun executeNode(
147 nodeInput: ExecutionServiceInput,
148 nodeOutput: ExecutionServiceOutput
150 log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})")
151 val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
152 checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
153 val nodeTemplateName = step.target!!
155 /** execute node template */
156 val executionServiceOutput = nodeTemplateExecutionService
157 .executeNodeTemplate(bluePrintRuntimeService, nodeTemplateName, nodeInput)
159 return when (executionServiceOutput.status.message) {
160 BlueprintConstants.STATUS_FAILURE -> EdgeLabel.FAILURE
161 else -> EdgeLabel.SUCCESS
165 override suspend fun skipNode(
167 nodeInput: ExecutionServiceInput,
168 nodeOutput: ExecutionServiceOutput
170 return EdgeLabel.SUCCESS
173 override suspend fun cancelNode(
175 nodeInput: ExecutionServiceInput,
176 nodeOutput: ExecutionServiceOutput
178 TODO("not implemented")
181 override suspend fun restartNode(
183 nodeInput: ExecutionServiceInput,
184 nodeOutput: ExecutionServiceOutput
186 TODO("not implemented")