Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / workflow-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / workflow / ImperativeWorkflowExecutionService.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.workflow
18
19 import kotlinx.coroutines.CompletableDeferred
20 import kotlinx.coroutines.coroutineScope
21 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
22 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
23 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
24 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
25 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
26 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
27 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
28 import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
29 import org.onap.ccsdk.cds.controllerblueprints.core.asGraph
30 import org.onap.ccsdk.cds.controllerblueprints.core.checkNotEmpty
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
32 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
33 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
34 import org.onap.ccsdk.cds.controllerblueprints.core.isAcyclic
35 import org.onap.ccsdk.cds.controllerblueprints.core.logger
36 import org.onap.ccsdk.cds.controllerblueprints.core.service.AbstractBluePrintWorkFlowService
37 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
38 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeExecuteMessage
39 import org.onap.ccsdk.cds.controllerblueprints.core.service.NodeSkipMessage
40 import org.onap.ccsdk.cds.controllerblueprints.core.service.WorkflowExecuteMessage
41 import org.springframework.stereotype.Service
42 import kotlin.coroutines.CoroutineContext
43
44 @Service("imperativeWorkflowExecutionService")
45 class ImperativeWorkflowExecutionService(
46     private val nodeTemplateExecutionService: NodeTemplateExecutionService
47 ) :
48     BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
49
50     override suspend fun executeBluePrintWorkflow(
51         bluePrintRuntimeService: BluePrintRuntimeService<*>,
52         executionServiceInput: ExecutionServiceInput,
53         properties: MutableMap<String, Any>
54     ): ExecutionServiceOutput {
55
56         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
57
58         val workflowName = executionServiceInput.actionIdentifiers.actionName
59
60         val graph = bluePrintContext.workflowByName(workflowName).asGraph()
61
62         if (!graph.isAcyclic()) {
63             throw BluePrintException("Imperative workflow must be acyclic. Check on_success/on_failure for circular references")
64         }
65
66         return coroutineScope {
67             ImperativeBluePrintWorkflowService(
68                 nodeTemplateExecutionService,
69                 this.coroutineContext[MDCContext]
70             )
71         }.let { it.executeWorkflow(graph, bluePrintRuntimeService, executionServiceInput) }
72     }
73 }
74
75 open class ImperativeBluePrintWorkflowService(private val nodeTemplateExecutionService: NodeTemplateExecutionService, private val mdcContext: CoroutineContext?) :
76     AbstractBluePrintWorkFlowService<ExecutionServiceInput, ExecutionServiceOutput>() {
77
78     final override val coroutineContext: CoroutineContext
79         get() = mdcContext?.let { super.coroutineContext + it } ?: super.coroutineContext
80
81     val log = logger(ImperativeBluePrintWorkflowService::class)
82
83     lateinit var bluePrintRuntimeService: BluePrintRuntimeService<*>
84     lateinit var executionServiceInput: ExecutionServiceInput
85     lateinit var workflowName: String
86
87     override suspend fun executeWorkflow(
88         graph: Graph,
89         bluePrintRuntimeService: BluePrintRuntimeService<*>,
90         input: ExecutionServiceInput
91     ): ExecutionServiceOutput {
92         this.graph = graph
93         this.bluePrintRuntimeService = bluePrintRuntimeService
94         this.executionServiceInput = input
95         this.workflowName = this.executionServiceInput.actionIdentifiers.actionName
96         this.workflowId = bluePrintRuntimeService.id()
97         val output = CompletableDeferred<ExecutionServiceOutput>()
98         val startMessage = WorkflowExecuteMessage(input, output)
99         val workflowActor = workflowActor()
100         if (!workflowActor.isClosedForSend) {
101             workflowActor.send(startMessage)
102         } else {
103             throw BluePrintProcessorException("workflow($workflowActor) actor is closed")
104         }
105         return output.await()
106     }
107
108     override suspend fun initializeWorkflow(input: ExecutionServiceInput): EdgeLabel {
109         return EdgeLabel.SUCCESS
110     }
111
112     override suspend fun prepareWorkflowOutput(): ExecutionServiceOutput {
113         val wfStatus = Status().apply {
114             if (exceptions.isNotEmpty()) {
115                 exceptions.forEach {
116                     val errorMessage = it.message ?: ""
117                     bluePrintRuntimeService.getBluePrintError().addError(errorMessage, "workflow")
118                     log.error("workflow($workflowId) exception :", it)
119                 }
120                 message = BluePrintConstants.STATUS_FAILURE
121             } else {
122                 message = BluePrintConstants.STATUS_SUCCESS
123             }
124             eventType = EventType.EVENT_COMPONENT_EXECUTED.name
125         }
126         return ExecutionServiceOutput().apply {
127             commonHeader = executionServiceInput.commonHeader
128             actionIdentifiers = executionServiceInput.actionIdentifiers
129             status = wfStatus
130         }
131     }
132
133     override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
134         NodeExecuteMessage<ExecutionServiceInput, ExecutionServiceOutput> {
135             val nodeOutput = ExecutionServiceOutput().apply {
136                 commonHeader = executionServiceInput.commonHeader
137                 actionIdentifiers = executionServiceInput.actionIdentifiers
138             }
139             return NodeExecuteMessage(node, executionServiceInput, nodeOutput)
140         }
141
142     override suspend fun prepareNodeSkipMessage(node: Graph.Node):
143         NodeSkipMessage<ExecutionServiceInput, ExecutionServiceOutput> {
144             val nodeOutput = ExecutionServiceOutput().apply {
145                 commonHeader = executionServiceInput.commonHeader
146                 actionIdentifiers = executionServiceInput.actionIdentifiers
147             }
148             return NodeSkipMessage(node, executionServiceInput, nodeOutput)
149         }
150
151     override suspend fun executeNode(
152         node: Graph.Node,
153         nodeInput: ExecutionServiceInput,
154         nodeOutput: ExecutionServiceOutput
155     ): EdgeLabel {
156         log.info("Executing workflow($workflowName[${this.workflowId}])'s step(${node.id})")
157         val step = bluePrintRuntimeService.bluePrintContext().workflowStepByName(this.workflowName, node.id)
158         checkNotEmpty(step.target) { "couldn't get step target for workflow(${this.workflowName})'s step(${node.id})" }
159         val nodeTemplateName = step.target!!
160
161         /** execute node template */
162         val executionServiceOutput = nodeTemplateExecutionService
163             .executeNodeTemplate(bluePrintRuntimeService, node.id, nodeTemplateName, nodeInput)
164
165         if (executionServiceOutput.status.message == BluePrintConstants.STATUS_FAILURE) {
166             // Clear step errors so that the workflow does not fail
167             bluePrintRuntimeService.getBluePrintError().stepErrors(node.id)?.clear()
168             return EdgeLabel.FAILURE
169         }
170
171         return EdgeLabel.SUCCESS
172     }
173
174     override suspend fun skipNode(
175         node: Graph.Node,
176         nodeInput: ExecutionServiceInput,
177         nodeOutput: ExecutionServiceOutput
178     ): EdgeLabel {
179         return EdgeLabel.SUCCESS
180     }
181
182     override suspend fun cancelNode(
183         node: Graph.Node,
184         nodeInput: ExecutionServiceInput,
185         nodeOutput: ExecutionServiceOutput
186     ): EdgeLabel {
187         TODO("not implemented")
188     }
189
190     override suspend fun restartNode(
191         node: Graph.Node,
192         nodeInput: ExecutionServiceInput,
193         nodeOutput: ExecutionServiceOutput
194     ): EdgeLabel {
195         TODO("not implemented")
196     }
197 }