f3169d937a08df369bbfe15133a4ff071b99e339
[ccsdk/cds.git] /
1 /*
2  *  Copyright Â© 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.TimeoutCancellationException
22 import kotlinx.coroutines.async
23 import kotlinx.coroutines.withTimeout
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
25 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
26 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant
27 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.RemoteScriptExecutionService
28 import org.onap.ccsdk.cds.controllerblueprints.core.*
29 import org.onap.ccsdk.cds.controllerblueprints.core.data.OperationAssignment
30 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
31 import org.slf4j.LoggerFactory
32 import org.springframework.beans.factory.config.ConfigurableBeanFactory
33 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
34 import org.springframework.context.annotation.Scope
35 import org.springframework.stereotype.Component
36
37 @ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION])
38 @Component("component-remote-python-executor")
39 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
40 open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService)
41     : AbstractComponentFunction() {
42
43     private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
44
45     companion object {
46         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
47         const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
48         const val INPUT_ARGUMENT_PROPERTIES = "argument-properties"
49
50         const val INPUT_COMMAND = "command"
51         const val INPUT_PACKAGES = "packages"
52         const val DEFAULT_SELECTOR = "remote-python"
53         const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout"
54         const val INPUT_EXECUTE_TIMEOUT = "execution-timeout"
55
56         const val ATTRIBUTE_EXEC_CMD_STATUS = "status"
57         const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs"
58         const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs"
59         const val ATTRIBUTE_RESPONSE_DATA = "response-data"
60         const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120
61         const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180
62     }
63
64     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
65
66         log.debug("Processing : $operationInputs")
67
68         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
69         val blueprintName = bluePrintContext.name()
70         val blueprintVersion = bluePrintContext.version()
71
72         val operationAssignment: OperationAssignment = bluePrintContext
73             .nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName)
74
75         val artifactName: String = operationAssignment.implementation?.primary
76             ?: throw BluePrintProcessorException("missing primary field to get artifact name for node template ($nodeTemplateName)")
77
78         val artifactDefinition =
79             bluePrintRuntimeService.resolveNodeTemplateArtifactDefinition(nodeTemplateName, artifactName)
80
81         checkNotBlank(artifactDefinition.file) { "couldn't get python script path($artifactName)" }
82
83         val pythonScript = normalizedFile(bluePrintContext.rootPath, artifactDefinition.file)
84
85         checkFileExists(pythonScript) { "python script(${pythonScript.absolutePath}) doesn't exists" }
86
87         val endPointSelector = getOperationInput(INPUT_ENDPOINT_SELECTOR)
88         val dynamicProperties = getOptionalOperationInput(INPUT_DYNAMIC_PROPERTIES)
89         val packages = getOptionalOperationInput(INPUT_PACKAGES)?.returnNullIfMissing()
90
91         val argsNode = getOptionalOperationInput(INPUT_ARGUMENT_PROPERTIES)?.returnNullIfMissing()
92
93         // This prevents unescaping values, as well as quoting the each parameter, in order to allow for spaces in values
94         val args = getOptionalOperationInput(INPUT_ARGUMENT_PROPERTIES)?.returnNullIfMissing()
95             ?.rootFieldsToMap()?.toSortedMap()?.values?.joinToString(" ") { formatNestedJsonNode(it) }
96
97         val command = getOperationInput(INPUT_COMMAND).asText()
98
99         /**
100          * Timeouts that are specific to the command executor.
101          * Note: the interface->input->timeout is the component level timeout.
102          */
103         val envPrepTimeout = getOptionalOperationInput(INPUT_ENV_PREPARE_TIMEOUT)?.asInt()
104             ?: DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC
105         val executionTimeout = getOptionalOperationInput(INPUT_EXECUTE_TIMEOUT)?.asInt()
106             ?: DEFAULT_EXECUTE_TIMEOUT_IN_SEC
107
108         var scriptCommand = command.replace(pythonScript.name, pythonScript.absolutePath)
109         if (args != null && args.isNotEmpty()) {
110             scriptCommand = scriptCommand.plus(" ").plus(args)
111         }
112
113         try {
114             // Open GRPC Connection
115             if (DEFAULT_SELECTOR == endPointSelector.asText()) {
116                 remoteScriptExecutionService.init(endPointSelector.asText())
117             } else {
118                 // Get endpoint from DSL
119                 val endPointSelectorJson = bluePrintRuntimeService.resolveDSLExpression(endPointSelector.asText())
120                 remoteScriptExecutionService.init(endPointSelectorJson)
121             }
122
123             // If packages are defined, then install in remote server
124             if (packages != null) {
125                 val prepareEnvInput = PrepareRemoteEnvInput(requestId = processId,
126                     remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName,
127                         blueprintVersion = blueprintVersion),
128                     packages = packages,
129                     timeOut = envPrepTimeout.toLong()
130
131                 )
132                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
133                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
134                 val logs = prepareEnvOutput.response
135                 val logsEnv = logs.toString().asJsonPrimitive()
136                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
137
138                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
139                     setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive())
140                     setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv)
141                 } else {
142                     setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive())
143                 }
144             }
145         } catch (grpcEx: io.grpc.StatusRuntimeException) {
146             val timeoutErrMsg = "Command executor timed out in GRPC call during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
147             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, timeoutErrMsg.asJsonPrimitive())
148             setNodeOutputErrors(status = timeoutErrMsg, message = "${grpcEx.status}".asJsonPrimitive())
149             log.error(timeoutErrMsg, grpcEx)
150             addError(timeoutErrMsg)
151         } catch (e: Exception) {
152             val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
153             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
154             setNodeOutputErrors(status = timeoutErrMsg, message = "${e.message}".asJsonPrimitive())
155             log.error("Failed to process on remote executor requestId ($processId)", e)
156             addError(timeoutErrMsg)
157         }
158         // if Env preparation was successful, then proceed with command execution in this Env
159         if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
160             try {
161                 // Populate command execution properties and pass it to the remote server
162                 val properties = dynamicProperties?.returnNullIfMissing()?.rootFieldsToMap() ?: hashMapOf()
163
164                 val remoteExecutionInput = RemoteScriptExecutionInput(
165                     requestId = processId,
166                     remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
167                     command = scriptCommand,
168                     properties = properties,
169                     timeOut = executionTimeout.toLong())
170
171
172                 val remoteExecutionOutputDeferred = GlobalScope.async {
173                     remoteScriptExecutionService.executeCommand(remoteExecutionInput)
174                 }
175
176                 val remoteExecutionOutput = withTimeout(executionTimeout * 1000L) {
177                     remoteExecutionOutputDeferred.await()
178                 }
179
180                 checkNotNull(remoteExecutionOutput) {
181                     "Error: Request-id $processId did not return a restul from remote command execution."
182                 }
183
184
185                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
186                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
187                     setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
188                 } else {
189                     setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
190                         remoteExecutionOutput.payload)
191                 }
192             } catch (timeoutEx: TimeoutCancellationException) {
193                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
194                 setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
195                 log.error(timeoutErrMsg, timeoutEx)
196             } catch (grpcEx: io.grpc.StatusRuntimeException) {
197                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
198                 setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
199                 log.error("Command executor time out during GRPC call", grpcEx)
200             } catch (e: Exception) {
201                 log.error("Failed to process on remote executor requestId ($processId)", e)
202             }
203         }
204         log.debug("Trying to close GRPC channel. request ($processId)")
205         remoteScriptExecutionService.close()
206     }
207
208
209     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
210         bluePrintRuntimeService.getBluePrintError()
211             .addError("Failed in ComponentRemotePythonExecutor : ${runtimeException.message}")
212     }
213
214     private fun formatNestedJsonNode(node: JsonNode): String {
215         val sb = StringBuilder()
216         if (node.isValueNode) {
217             sb.append(" $node")
218         } else {
219             node.forEach { sb.append(" $it") }
220         }
221         return sb.toString()
222     }
223
224     /**
225      * Utility function to set the output properties of the executor node
226      */
227     private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
228         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
229         log.info("Executor status   : $status")
230         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
231         log.info("Executor artifacts: $artifacts")
232         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
233         log.info("Executor message  : $message")
234     }
235
236     /**
237      * Utility function to set the output properties and errors of the executor node, in cas of errors
238      */
239     private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
240         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
241         log.info("Executor status   : $status")
242         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
243         log.info("Executor message  : $message")
244         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
245         log.info("Executor artifacts: $artifacts")
246         addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString())
247     }
248 }