c2c7a60e91fb6710983b4799a5298844bc141a89
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2017-2018 AT&T Intellectual Property.
3  * Modifications Copyright © 2019 IBM.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
19
20 import io.grpc.stub.StreamObserver
21 import io.micrometer.core.instrument.MeterRegistry
22 import io.micrometer.core.instrument.Timer
23 import kotlinx.coroutines.Dispatchers
24 import kotlinx.coroutines.GlobalScope
25 import kotlinx.coroutines.launch
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
32 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
33 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
34 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
35 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.workflow.audit.StoreAuditService
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
40 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
41 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
42 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
43 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
44 import org.slf4j.LoggerFactory
45 import org.springframework.stereotype.Service
46
47 @Service
48 class ExecutionServiceHandler(
49     private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
50     private val blueprintsProcessorCatalogService: BluePrintCatalogService,
51     private val bluePrintWorkflowExecutionService:
52         BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
53     private val publishAuditService: PublishAuditService,
54     private val storeAuditService: StoreAuditService,
55     private val meterRegistry: MeterRegistry
56 ) {
57
58     private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
59
60     suspend fun process(
61         executionServiceInput: ExecutionServiceInput,
62         responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
63     ) {
64         when {
65             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
66                 GlobalScope.launch(Dispatchers.Default) {
67                     val executionServiceOutput = doProcess(executionServiceInput)
68                     responseObserver.onNext(executionServiceOutput.toProto())
69                     responseObserver.onCompleted()
70                 }
71                 responseObserver.onNext(response(executionServiceInput).toProto())
72             }
73             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
74                 val executionServiceOutput = doProcess(executionServiceInput)
75                 responseObserver.onNext(executionServiceOutput.toProto())
76                 responseObserver.onCompleted()
77             }
78             else -> {
79                 publishAuditService.publishExecutionInput(executionServiceInput)
80                 val executionServiceOutput = response(
81                     executionServiceInput,
82                     "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
83                     true
84                 )
85                 meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
86                 publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
87                 responseObserver.onNext(
88                     executionServiceOutput.toProto()
89                 )
90             }
91         }
92     }
93
94     suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
95         val requestId = executionServiceInput.commonHeader.requestId
96         val actionIdentifiers = executionServiceInput.actionIdentifiers
97         val blueprintName = actionIdentifiers.blueprintName
98         val blueprintVersion = actionIdentifiers.blueprintVersion
99
100         lateinit var executionServiceOutput: ExecutionServiceOutput
101
102         log.info("processing request id $requestId")
103
104         // Audit input
105         publishAuditService.publishExecutionInput(executionServiceInput)
106
107         // store audit input details
108         val auditStoreId: Long = storeAuditService.storeExecutionInput(executionServiceInput)
109         log.info("StoreAuditService ID  $auditStoreId")
110
111         val sample = Timer.start()
112         try {
113             /** Check Blueprint is needed for this request */
114             if (checkServiceFunction(executionServiceInput)) {
115                 executionServiceOutput = executeServiceFunction(executionServiceInput)
116             } else {
117                 val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
118                 log.info("blueprint base path $basePath")
119
120                 val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
121
122                 executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
123                     blueprintRuntimeService,
124                     executionServiceInput, hashMapOf()
125                 )
126
127                 val errors = blueprintRuntimeService.getBluePrintError().allErrors()
128                 if (errors.isNotEmpty()) {
129                     setErrorStatus(errors.joinToString(", "), executionServiceOutput.status)
130                 }
131             }
132         } catch (e: Exception) {
133             log.error("fail processing request id $requestId", e)
134             executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
135         }
136         // Update process metrics
137         sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
138         meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
139         // Audit output
140         publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
141
142         // store audit input details
143         storeAuditService.storeExecutionOutput(
144             auditStoreId, executionServiceInput.correlationUUID,
145             executionServiceOutput
146         )
147         return executionServiceOutput
148     }
149
150     /** If the blueprint name is default, It means no blueprint is needed for the execution */
151     fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
152         return executionServiceInput.actionIdentifiers.blueprintName == "default"
153     }
154
155     /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
156     suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
157         val actionName = executionServiceInput.actionIdentifiers.actionName
158         val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
159         checkNotNull(instance) { "failed to initialize service function($actionName)" }
160         instance.actionName = actionName
161         return instance.applyNB(executionServiceInput)
162     }
163
164     private fun setErrorStatus(errorMessage: String, status: Status) {
165         status.errorMessage = errorMessage
166         status.eventType = EventType.EVENT_COMPONENT_FAILURE.name
167         status.code = 500
168         status.message = BluePrintConstants.STATUS_FAILURE
169     }
170
171     private fun response(
172         executionServiceInput: ExecutionServiceInput,
173         errorMessage: String = "",
174         failure: Boolean = false
175     ): ExecutionServiceOutput {
176         val executionServiceOutput = ExecutionServiceOutput()
177         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
178         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
179         executionServiceOutput.payload = executionServiceInput.payload
180
181         val status = Status()
182         if (failure) {
183             setErrorStatus(errorMessage, status)
184         } else {
185             status.eventType = EventType.EVENT_COMPONENT_PROCESSING.name
186             status.code = 200
187             status.message = BluePrintConstants.STATUS_PROCESSING
188         }
189
190         executionServiceOutput.status = status
191
192         return executionServiceOutput
193     }
194 }