2 * Copyright © 2017-2018 AT&T Intellectual Property.
3 * Modifications Copyright © 2019 IBM.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
20 import io.grpc.stub.StreamObserver
21 import io.micrometer.core.instrument.MeterRegistry
22 import io.micrometer.core.instrument.Timer
23 import kotlinx.coroutines.Dispatchers
24 import kotlinx.coroutines.GlobalScope
25 import kotlinx.coroutines.launch
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
29 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
31 import org.onap.ccsdk.cds.blueprintsprocessor.core.utils.toProto
32 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.COUNTER_PROCESS
33 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants.TIMER_PROCESS
34 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.cbaMetricTags
35 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction
36 import org.onap.ccsdk.cds.blueprintsprocessor.functions.workflow.audit.StoreAuditService
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
38 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
39 import org.onap.ccsdk.cds.controllerblueprints.core.config.BluePrintLoadConfiguration
40 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
41 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintWorkflowExecutionService
42 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
43 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
44 import org.slf4j.LoggerFactory
45 import org.springframework.stereotype.Service
48 class ExecutionServiceHandler(
49 private val bluePrintLoadConfiguration: BluePrintLoadConfiguration,
50 private val blueprintsProcessorCatalogService: BluePrintCatalogService,
51 private val bluePrintWorkflowExecutionService:
52 BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>,
53 private val publishAuditService: PublishAuditService,
54 private val storeAuditService: StoreAuditService,
55 private val meterRegistry: MeterRegistry
58 private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString())
61 executionServiceInput: ExecutionServiceInput,
62 responseObserver: StreamObserver<org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput>
65 executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
66 GlobalScope.launch(Dispatchers.Default) {
67 val executionServiceOutput = doProcess(executionServiceInput)
68 responseObserver.onNext(executionServiceOutput.toProto())
69 responseObserver.onCompleted()
71 responseObserver.onNext(response(executionServiceInput).toProto())
73 executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
74 val executionServiceOutput = doProcess(executionServiceInput)
75 responseObserver.onNext(executionServiceOutput.toProto())
76 responseObserver.onCompleted()
79 publishAuditService.publishExecutionInput(executionServiceInput)
80 val executionServiceOutput = response(
81 executionServiceInput,
82 "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
85 meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
86 publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
87 responseObserver.onNext(
88 executionServiceOutput.toProto()
94 suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
95 val requestId = executionServiceInput.commonHeader.requestId
96 val actionIdentifiers = executionServiceInput.actionIdentifiers
97 val blueprintName = actionIdentifiers.blueprintName
98 val blueprintVersion = actionIdentifiers.blueprintVersion
100 lateinit var executionServiceOutput: ExecutionServiceOutput
102 log.info("processing request id $requestId")
105 publishAuditService.publishExecutionInput(executionServiceInput)
107 // store audit input details
108 val auditStoreId: Long = storeAuditService.storeExecutionInput(executionServiceInput)
109 log.info("StoreAuditService ID $auditStoreId")
111 val sample = Timer.start()
113 /** Check Blueprint is needed for this request */
114 if (checkServiceFunction(executionServiceInput)) {
115 executionServiceOutput = executeServiceFunction(executionServiceInput)
117 val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion)
118 log.info("blueprint base path $basePath")
120 val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
122 executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow(
123 blueprintRuntimeService,
124 executionServiceInput, hashMapOf()
127 val errors = blueprintRuntimeService.getBluePrintError().allErrors()
128 if (errors.isNotEmpty()) {
129 setErrorStatus(errors.joinToString(", "), executionServiceOutput.status)
132 } catch (e: Exception) {
133 log.error("fail processing request id $requestId", e)
134 executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true)
136 // Update process metrics
137 sample.stop(meterRegistry.timer(TIMER_PROCESS, cbaMetricTags(executionServiceInput)))
138 meterRegistry.counter(COUNTER_PROCESS, cbaMetricTags(executionServiceOutput)).increment()
140 publishAuditService.publishExecutionOutput(executionServiceInput.correlationUUID, executionServiceOutput)
142 // store audit input details
143 storeAuditService.storeExecutionOutput(
144 auditStoreId, executionServiceInput.correlationUUID,
145 executionServiceOutput
147 return executionServiceOutput
150 /** If the blueprint name is default, It means no blueprint is needed for the execution */
151 fun checkServiceFunction(executionServiceInput: ExecutionServiceInput): Boolean {
152 return executionServiceInput.actionIdentifiers.blueprintName == "default"
155 /** If no blueprint is needed, then get the Service function instance mapping to the action name and execute it */
156 suspend fun executeServiceFunction(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
157 val actionName = executionServiceInput.actionIdentifiers.actionName
158 val instance = BluePrintDependencyService.instance<AbstractServiceFunction>(actionName)
159 checkNotNull(instance) { "failed to initialize service function($actionName)" }
160 instance.actionName = actionName
161 return instance.applyNB(executionServiceInput)
164 private fun setErrorStatus(errorMessage: String, status: Status) {
165 status.errorMessage = errorMessage
166 status.eventType = EventType.EVENT_COMPONENT_FAILURE.name
168 status.message = BluePrintConstants.STATUS_FAILURE
171 private fun response(
172 executionServiceInput: ExecutionServiceInput,
173 errorMessage: String = "",
174 failure: Boolean = false
175 ): ExecutionServiceOutput {
176 val executionServiceOutput = ExecutionServiceOutput()
177 executionServiceOutput.commonHeader = executionServiceInput.commonHeader
178 executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
179 executionServiceOutput.payload = executionServiceInput.payload
181 val status = Status()
183 setErrorStatus(errorMessage, status)
185 status.eventType = EventType.EVENT_COMPONENT_PROCESSING.name
187 status.message = BluePrintConstants.STATUS_PROCESSING
190 executionServiceOutput.status = status
192 return executionServiceOutput