Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
1 /*
2  * Copyright © 2017-2018 AT&T Intellectual Property.
3  * Modifications Copyright © 2019 IBM.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
19
20 import io.grpc.stub.StreamObserver
21 import io.micrometer.core.instrument.MeterRegistry
22 import io.micrometer.core.instrument.Timer
23 import kotlinx.coroutines.Dispatchers
24 import kotlinx.coroutines.GlobalScope
25 import kotlinx.coroutines.launch
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
32 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
33 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
34 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
35 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
37 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
38 import org.onap.ccsdk.cds.controllerblueprints.core.config.BlueprintLoadConfiguration
39 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintCatalogService
40 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BlueprintWorkflowExecutionService
41 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
42 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BlueprintMetadataUtils
43 import org.slf4j.LoggerFactory
44 import org.springframework.stereotype.Service
45 import java.util.stream.Collectors
46
47 @Service
48 class ExecutionServiceHandler(
49     private val bluePrintLoadConfiguration: BlueprintLoadConfiguration,
50     private val blueprintsProcessorCatalogService: BlueprintCatalogService,
51     private val bluePrintWorkflowExecutionService:
52         BlueprintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
53     private val publishAuditService: PublishAuditService,
54     private val meterRegistry: MeterRegistry
55 ) {
56
57     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
58
59     suspend fun process(
60         executionServiceInput: ExecutionServiceInput,
61         responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
62     ) {
63         when {
64             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
65                 GlobalScope.launch(Dispatchers.Default) {
66                     val executionServiceOutput = doProcess(executionServiceInput)
67                     responseObserver.onNext(executionServiceOutput.toProto())
68                     responseObserver.onCompleted()
69                 }
70                 responseObserver.onNext(response(executionServiceInput).toProto())
71             }
72             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
73                 val executionServiceOutput = doProcess(executionServiceInput)
74                 responseObserver.onNext(executionServiceOutput.toProto())
75                 responseObserver.onCompleted()
76             }
77             else -> {
78                 publishAuditService.publishExecutionInput(executionServiceInput)
79                 val executionServiceOutput = response(
80                     executionServiceInput,
81                     "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
82                     true
83                 )
84                 meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
85                 publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
86                 responseObserver.onNext(
87                     executionServiceOutput.toProto()
88                 )
89             }
90         }
91     }
92
93     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
94         val requestId = executionServiceInput.commonHeader.requestId
95         val actionIdentifiers = executionServiceInput.actionIdentifiers
96         val blueprintName = actionIdentifiers.blueprintName
97         val blueprintVersion = actionIdentifiers.blueprintVersion
98
99         lateinit var executionServiceOutput: ExecutionServiceOutput
100
101         log.info("processing request id $requestId")
102
103         // Audit input
104         publishAuditService.publishExecutionInput(executionServiceInput)
105
106         val sample = Timer.start()
107         try {
108             /** Check Blueprint is needed for this request */
109             if (checkServiceFunction(executionServiceInput)) {
110                 executionServiceOutput = executeServiceFunction(executionServiceInput)
111             } else {
112                 val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
113                 log.info("blueprint base path $basePath")
114
115                 val blueprintRuntimeService = BlueprintMetadataUtils.getBlueprintRuntime(requestId, basePath.toString())
116
117                 executionServiceOutput = bluePrintWorkflowExecutionService.executeBlueprintWorkflow(
118                     blueprintRuntimeService,
119                     executionServiceInput, hashMapOf()
120                 )
121
122                 val errors = blueprintRuntimeService.getBlueprintError().errors
123                 if (errors.isNotEmpty()) {
124                     val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", "))
125                     setErrorStatus(errorMessage, executionServiceOutput.status)
126                 }
127             }
128         } catch (e: Exception) {
129             log.error("fail processing request id $requestId", e)
130             executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
131         }
132         // Update process metrics
133         sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
134         meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
135
136         // Audit output
137         publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
138         return executionServiceOutput
139     }
140
141     /** If the blueprint name is default, It means no blueprint is needed for the execution */
142     fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
143         return executionServiceInput.actionIdentifiers.blueprintName == "default"
144     }
145
146     /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
147     suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
148         val actionName = executionServiceInput.actionIdentifiers.actionName
149         val instance = BlueprintDependencyService.instance<AbstractServiceFunction>(actionName)
150         checkNotNull(instance) { "failed to initialize service function($actionName)" }
151         instance.actionName = actionName
152         return instance.applyNB(executionServiceInput)
153     }
154
155     private fun setErrorStatus(errorMessage: String, status: Status) {
156         status.errorMessage = errorMessage
157         status.eventType = EventType.EVENT_COMPONENT_FAILURE.name
158         status.code = 500
159         status.message = BlueprintConstants.STATUS_FAILURE
160     }
161
162     private fun response(
163         executionServiceInput: ExecutionServiceInput,
164         errorMessage: String = "",
165         failure: Boolean = false
166     ): ExecutionServiceOutput {
167         val executionServiceOutput = ExecutionServiceOutput()
168         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
169         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
170         executionServiceOutput.payload = executionServiceInput.payload
171
172         val status = Status()
173         if (failure) {
174             setErrorStatus(errorMessage, status)
175         } else {
176             status.eventType = EventType.EVENT_COMPONENT_PROCESSING.name
177             status.code = 200
178             status.message = BlueprintConstants.STATUS_PROCESSING
179         }
180
181         executionServiceOutput.status = status
182
183         return executionServiceOutput
184     }
185 }