Improving CMD-exec err msgs/handling.
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / python-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / python / executor / ComponentRemotePythonExecutor.kt
1 /*
2  *  Copyright Â© 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import kotlinx.coroutines.GlobalScope
21 import kotlinx.coroutines.TimeoutCancellationException
22 import kotlinx.coroutines.async
23 import kotlinx.coroutines.withTimeout
24 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
25 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput
26 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteIdentifier
27 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteScriptExecutionInput
28 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StatusType
29 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
30 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant
31 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.RemoteScriptExecutionService
32 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
33 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
34 import org.onap.ccsdk.cds.controllerblueprints.core.checkFileExists
35 import org.onap.ccsdk.cds.controllerblueprints.core.checkNotBlank
36 import org.onap.ccsdk.cds.controllerblueprints.core.data.OperationAssignment
37 import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
38 import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing
39 import org.onap.ccsdk.cds.controllerblueprints.core.rootFieldsToMap
40 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
41 import org.slf4j.LoggerFactory
42 import org.springframework.beans.factory.config.ConfigurableBeanFactory
43 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
44 import org.springframework.context.annotation.Scope
45 import org.springframework.stereotype.Component
46
47 @ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION])
48 @Component("component-remote-python-executor")
49 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
50 open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService) : AbstractComponentFunction() {
51
52     private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
53
54     companion object {
55         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
56         const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
57         const val INPUT_ARGUMENT_PROPERTIES = "argument-properties"
58
59         const val INPUT_COMMAND = "command"
60         const val INPUT_PACKAGES = "packages"
61         const val DEFAULT_SELECTOR = "remote-python"
62         const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout"
63         const val INPUT_EXECUTE_TIMEOUT = "execution-timeout"
64
65         const val ATTRIBUTE_EXEC_CMD_STATUS = "status"
66         const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs"
67         const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs"
68         const val ATTRIBUTE_RESPONSE_DATA = "response-data"
69         const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120
70         const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180
71     }
72
73     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
74
75         log.debug("Processing : $operationInputs")
76
77         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
78         val blueprintName = bluePrintContext.name()
79         val blueprintVersion = bluePrintContext.version()
80
81         val operationAssignment: OperationAssignment = bluePrintContext
82             .nodeTemplateInterfaceOperation(nodeTemplateName, interfaceName, operationName)
83
84         val artifactName: String = operationAssignment.implementation?.primary
85             ?: throw BluePrintProcessorException("missing primary field to get artifact name for node template ($nodeTemplateName)")
86
87         val artifactDefinition =
88             bluePrintRuntimeService.resolveNodeTemplateArtifactDefinition(nodeTemplateName, artifactName)
89
90         checkNotBlank(artifactDefinition.file) { "couldn't get python script path($artifactName)" }
91
92         val pythonScript = normalizedFile(bluePrintContext.rootPath, artifactDefinition.file)
93
94         checkFileExists(pythonScript) { "python script(${pythonScript.absolutePath}) doesn't exists" }
95
96         val endPointSelector = getOperationInput(INPUT_ENDPOINT_SELECTOR)
97         val dynamicProperties = getOptionalOperationInput(INPUT_DYNAMIC_PROPERTIES)
98         val packages = getOptionalOperationInput(INPUT_PACKAGES)?.returnNullIfMissing()
99
100         val argsNode = getOptionalOperationInput(INPUT_ARGUMENT_PROPERTIES)?.returnNullIfMissing()
101
102         // This prevents unescaping values, as well as quoting the each parameter, in order to allow for spaces in values
103         val args = getOptionalOperationInput(INPUT_ARGUMENT_PROPERTIES)?.returnNullIfMissing()
104             ?.rootFieldsToMap()?.toSortedMap()?.values?.joinToString(" ") { formatNestedJsonNode(it) }
105
106         val command = getOperationInput(INPUT_COMMAND).asText()
107
108         /**
109          * Timeouts that are specific to the command executor.
110          * Note: the interface->input->timeout is the component level timeout.
111          */
112         val envPrepTimeout = getOptionalOperationInput(INPUT_ENV_PREPARE_TIMEOUT)?.asInt()
113             ?: DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC
114         val executionTimeout = getOptionalOperationInput(INPUT_EXECUTE_TIMEOUT)?.asInt()
115             ?: DEFAULT_EXECUTE_TIMEOUT_IN_SEC
116
117         var scriptCommand = command.replace(pythonScript.name, pythonScript.absolutePath)
118         if (args != null && args.isNotEmpty()) {
119             scriptCommand = scriptCommand.plus(" ").plus(args)
120         }
121
122         try {
123             // Open GRPC Connection
124             if (DEFAULT_SELECTOR == endPointSelector.asText()) {
125                 remoteScriptExecutionService.init(endPointSelector.asText())
126             } else {
127                 // Get endpoint from DSL
128                 val endPointSelectorJson = bluePrintRuntimeService.resolveDSLExpression(endPointSelector.asText())
129                 remoteScriptExecutionService.init(endPointSelectorJson)
130             }
131
132             // If packages are defined, then install in remote server
133             if (packages != null) {
134                 val prepareEnvInput = PrepareRemoteEnvInput(
135                     requestId = processId,
136                     remoteIdentifier = RemoteIdentifier(
137                         blueprintName = blueprintName,
138                         blueprintVersion = blueprintVersion),
139                     packages = packages,
140                     timeOut = envPrepTimeout.toLong()
141
142                 )
143                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
144                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
145                 val logs = prepareEnvOutput.response
146                 val logsEnv = logs.toString().asJsonPrimitive()
147                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
148
149                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
150                     setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive())
151                     setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv)
152                 } else {
153                     setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive())
154                 }
155             } else {
156                 // set env preparation log to empty...
157                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, "".asJsonPrimitive())
158             }
159         } catch (grpcEx: io.grpc.StatusRuntimeException) {
160             val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId)."
161             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
162             setNodeOutputErrors(status = grpcErrMsg, message = "${grpcEx.status}".asJsonPrimitive())
163             log.error(grpcErrMsg, grpcEx)
164             addError(grpcErrMsg)
165         } catch (e: Exception) {
166             val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
167             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
168             setNodeOutputErrors(status = timeoutErrMsg, message = "${e.message}".asJsonPrimitive())
169             log.error("Failed to process on remote executor requestId ($processId)", e)
170             addError(timeoutErrMsg)
171         }
172         // if Env preparation was successful, then proceed with command execution in this Env
173         if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
174             try {
175                 // Populate command execution properties and pass it to the remote server
176                 val properties = dynamicProperties?.returnNullIfMissing()?.rootFieldsToMap() ?: hashMapOf()
177
178                 val remoteExecutionInput = RemoteScriptExecutionInput(
179                     requestId = processId,
180                     remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
181                     command = scriptCommand,
182                     properties = properties,
183                     timeOut = implementation.timeout.toLong())
184
185                 val remoteExecutionOutputDeferred = GlobalScope.async {
186                     remoteScriptExecutionService.executeCommand(remoteExecutionInput)
187                 }
188
189                 val remoteExecutionOutput = withTimeout(implementation.timeout * 1000L) {
190                     remoteExecutionOutputDeferred.await()
191                 }
192
193                 checkNotNull(remoteExecutionOutput) {
194                     "Error: Request-id $processId did not return a restul from remote command execution."
195                 }
196                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
197                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
198                     setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
199                 } else {
200                     setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
201                         remoteExecutionOutput.payload)
202                 }
203             } catch (timeoutEx: TimeoutCancellationException) {
204                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
205                 setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
206                 log.error(timeoutErrMsg, timeoutEx)
207             } catch (grpcEx: io.grpc.StatusRuntimeException) {
208                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
209                 setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
210                 log.error("Command executor time out during GRPC call", grpcEx)
211             } catch (e: Exception) {
212                 log.error("Failed to process on remote executor requestId ($processId)", e)
213             }
214         }
215         log.debug("Trying to close GRPC channel. request ($processId)")
216         remoteScriptExecutionService.close()
217     }
218
219     override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
220         bluePrintRuntimeService.getBluePrintError()
221             .addError("Failed in ComponentRemotePythonExecutor : ${runtimeException.message}")
222     }
223
224     private fun formatNestedJsonNode(node: JsonNode): String {
225         val sb = StringBuilder()
226         if (node.isValueNode) {
227             sb.append(" $node")
228         } else {
229             node.forEach { sb.append(" $it") }
230         }
231         return sb.toString()
232     }
233
234     /**
235      * Utility function to set the output properties of the executor node
236      */
237     private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
238         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
239         log.info("Executor status   : $status")
240         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
241         log.info("Executor artifacts: $artifacts")
242         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
243         log.info("Executor message  : $message")
244     }
245
246     /**
247      * Utility function to set the output properties and errors of the executor node, in cas of errors
248      */
249     private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
250         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
251         log.info("Executor status   : $status")
252         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
253         log.info("Executor message  : $message")
254         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
255         log.info("Executor artifacts: $artifacts")
256         addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString())
257     }
258 }