Command Executor : Invalid response_data when executed script fails 64/109664/9
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 29 Jun 2020 23:54:27 +0000 (19:54 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Thu, 2 Jul 2020 15:36:20 +0000 (15:36 +0000)
* Modified command exec returned value in case of failure during execution. It now prints the response_data defined by the user
* Modified truncation method of the gRPC returned object to use ByteSize() to get the exact sizxe consumed within the buffer

Issue-ID: CCSDK-2501
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Ie1db8db265623b5137ab3946ff4e3abda1c54a78

ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
ms/command-executor/src/main/docker/Dockerfile
ms/command-executor/src/main/python/command_executor_handler.py
ms/command-executor/src/main/python/command_executor_server.py
ms/command-executor/src/main/python/utils.py

index 50f0b14..7f32fa9 100644 (file)
@@ -182,12 +182,12 @@ open class ComponentRemotePythonExecutor(
             val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
             val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}"
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error(grpcErrMsg, grpcEx)
         } catch (e: Exception) {
             val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error(timeoutErrMsg, e)
         }
         // if Env preparation was successful, then proceed with command execution in this Env
@@ -236,7 +236,7 @@ open class ComponentRemotePythonExecutor(
                 setNodeOutputErrors(status = StatusType.FAILURE.name,
                     step = STEP_EXEC_CMD,
                     logs = "".asJsonPrimitive(),
-                    error = timeoutErrMsg.asJsonPrimitive(),
+                    message = timeoutErrMsg.asJsonPrimitive(),
                     logging = isLogResponseEnabled
                 )
                 log.error(timeoutErrMsg, timeoutEx)
@@ -245,13 +245,13 @@ open class ComponentRemotePythonExecutor(
                 setNodeOutputErrors(status = StatusType.FAILURE.name,
                     step = STEP_EXEC_CMD,
                     logs = "".asJsonPrimitive(),
-                    error = timeoutErrMsg.asJsonPrimitive(),
+                    message = timeoutErrMsg.asJsonPrimitive(),
                     logging = isLogResponseEnabled
                 )
                 log.error(timeoutErrMsg, grpcEx)
             } catch (e: Exception) {
                 val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
-                setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, error = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+                setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
                 log.error(timeoutErrMsg, e)
             }
         }
@@ -280,42 +280,42 @@ open class ComponentRemotePythonExecutor(
     private fun setNodeOutputProperties(
         status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(),
         step: String,
+        logs: JsonNode,
         message: JsonNode,
-        artifacts: JsonNode,
         logging: Boolean = true
     ) {
 
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
 
         if (logging) {
-            log.info("Executor status   : $step : $status")
-            log.info("Executor artifacts: $step : $artifacts")
-            log.info("Executor message  : $step : $message")
+            log.info("Executor status : $step : $status")
+            log.info("Executor message: $step : $message")
+            log.info("Executor logs   : $step : $logs")
         }
     }
 
     /**
-     * Utility function to set the output properties and errors of the executor node, in cas of errors
+     * Utility function to set the output properties and errors of the executor node, in case of errors
      */
     private fun setNodeOutputErrors(
         status: String,
         step: String,
         logs: JsonNode = "N/A".asJsonPrimitive(),
-        error: JsonNode,
+        message: JsonNode,
         logging: Boolean = true
     ) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive())
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
 
         if (logging) {
-            log.info("Executor status   : $step : $status")
-            log.info("Executor message  : $step : $error")
-            log.info("Executor logs     : $step : $logs")
+            log.info("Executor status : $step : $status")
+            log.info("Executor message: $step : $message")
+            log.info("Executor logs   : $step : $logs")
         }
 
-        addError(status, step, error.toString())
+        addError(status, step, logs.toString())
     }
 }
index 610e10c..1e5d4cb 100644 (file)
@@ -3,7 +3,7 @@ FROM python:3.6-slim
 ENV GRPC_PYTHON_VERSION 1.20.0
 RUN python -m pip install --upgrade pip
 RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION}
-RUN pip install virtualenv==16.7.9 pympler==0.8
+RUN pip install virtualenv==16.7.9
 
 RUN groupadd -r -g 1000 onap && useradd -r -u 1000 -g onap onap
 
index 7c9ef84..0533b41 100644 (file)
@@ -143,21 +143,15 @@ class CommandExecutorHandler():
                 rc = newProcess.poll()
         except Exception as e:
             err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e)
-            return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
+            result.update(utils.build_ret_data(False, results_log=results_log, error=err_msg))
+            return result
 
         # deactivate_venv(blueprint_id)
         #Since return code is only used to check if it's zero (success), we can just return success flag instead.
         self.logger.debug("python return_code : {}".format(rc))
-        if rc == 0:
-            return utils.build_ret_data(True, results=result, results_log=results_log)
-        else:
-            err_msg = ""
-            if len(results_log) > 0:
-                # get exception message
-                err_msg = "{} - {}".format(self.blueprint_id, results_log[-1:][0])
-            else:
-                err_msg = "{} - Process exited with return code {}".format(self.blueprint_id, rc)
-            return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
+        is_execution_successful = rc == 0
+        result.update(utils.build_ret_data(is_execution_successful, results_log=results_log))
+        return result
 
     def install_packages(self, request, type, f, results):
         success = self.install_python_packages('UTILITY', results)
@@ -233,11 +227,11 @@ class CommandExecutorHandler():
             venv.create(self.venv_home, with_pip=True, system_site_packages=True)
             virtualenv.writefile(os.path.join(bin_dir, "activate_this.py"), virtualenv.ACTIVATE_THIS)
             self.logger.info("{} - Creation of Python Virtual Environment finished.".format(self.blueprint_id))
-            return utils.build_ret_data(True, "")
+            return utils.build_ret_data(True)
         except Exception as err:
             err_msg = "{} - Failed to provision Python Virtual Environment. Error: {}".format(self.blueprint_id, err)
             self.logger.info(err_msg)
-            return utils.build_ret_data(False, err_msg)
+            return utils.build_ret_data(False, error=err_msg)
 
     # return map cds_is_successful and err_msg. Status is True on success. err_msg may existence doesn't necessarily indicate fatal condition.
     # the 'status' should be set to False to indicate error.
@@ -255,11 +249,11 @@ class CommandExecutorHandler():
                 exec (activate_this_script.read(), {'__file__': path})
             exec (fixpathenvvar)
             self.logger.info("Running with PATH : {}".format(os.environ['PATH']))
-            return utils.build_ret_data(True, "")
+            return utils.build_ret_data(True)
         except Exception as err:
             err_msg ="{} - Failed to activate Python Virtual Environment. Error: {}".format(self.blueprint_id, err)
             self.logger.info( err_msg)
-            return utils.build_ret_data(False, err_msg)
+            return utils.build_ret_data(False, error=err_msg)
 
     def deactivate_venv(self):
         self.logger.info("{} - Deactivate Python Virtual Environment".format(self.blueprint_id))
index 2070976..aa666ee 100644 (file)
@@ -47,18 +47,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
         if os.environ.get('CE_DEBUG','false') == "true":
             self.logger.info(request)
 
-        log_results = []
-        payload_result = {}
         handler = CommandExecutorHandler(request)
         exec_cmd_response = handler.execute_command(request)
         if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]:
             self.logger.info("{} - Execution finished successfully.".format(blueprint_id))
-            self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
-            self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY]))
         else:
-            self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY]))
+            self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
 
         ret = utils.build_grpc_response(request.requestId, exec_cmd_response)
-        self.logger.info("Response returned : {}".format(exec_cmd_response))
+        self.logger.info("Payload returned : {}".format(exec_cmd_response))
 
         return ret
\ No newline at end of file
index b982416..180cd8c 100644 (file)
@@ -17,13 +17,12 @@ from google.protobuf.timestamp_pb2 import Timestamp
 
 import proto.CommandExecutor_pb2 as CommandExecutor_pb2
 import json
-from pympler import asizeof
 
 CDS_IS_SUCCESSFUL_KEY = "cds_is_successful"
 ERR_MSG_KEY = "err_msg"
 RESULTS_KEY = "results"
 RESULTS_LOG_KEY = "results_log"
-TRUNC_MSG_LEN = 3 * 1024 * 1024
+RESPONSE_MAX_SIZE = 4 * 1024 * 1024 # 4Mb
 
 def get_blueprint_id(request):
   blueprint_name = request.identifiers.blueprintName
@@ -34,44 +33,42 @@ def get_blueprint_id(request):
 def build_grpc_response(request_id, response):
   if response[CDS_IS_SUCCESSFUL_KEY]:
     status = CommandExecutor_pb2.SUCCESS
-    payload = json.dumps(response[RESULTS_KEY])
   else:
     status = CommandExecutor_pb2.FAILURE
-    # truncate error message if too heavy
-    if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN:
-      response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN])
-    payload = json.dumps(response[ERR_MSG_KEY])
 
-  # truncate cmd-exec logs if too heavy
-  response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY])
+  response.pop(CDS_IS_SUCCESSFUL_KEY)
+  logs = response.pop(RESULTS_LOG_KEY)
+
+  # Payload should only contains response data returned from the executed script and/or the error message
+  payload = json.dumps(response)
 
   timestamp = Timestamp()
   timestamp.GetCurrentTime()
 
-  return CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
-                                             response=response[RESULTS_LOG_KEY],
+  execution_output = CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
+                                             response=logs,
                                              status=status,
                                              payload=payload,
                                              timestamp=timestamp)
 
-# build a ret data structure
-def build_ret_data(cds_is_successful, results={}, results_log=[], error=None):
+  return truncate_execution_output(execution_output)
+
+# build a ret data structure used to populate the ExecutionOutput
+def build_ret_data(cds_is_successful, results_log=[], error=None):
   ret_data = {
-            CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
-            RESULTS_KEY: results,
-            RESULTS_LOG_KEY: results_log
-         }
+    CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
+    RESULTS_LOG_KEY: results_log
+  }
   if error:
     ret_data[ERR_MSG_KEY] = error
   return ret_data
 
-def truncate_cmd_exec_logs(logs):
-    truncated_logs = []
-    truncated_logs_memsize = 0
-    for log in logs:
-        truncated_logs_memsize += asizeof.asizeof(log)
-        if truncated_logs_memsize > TRUNC_MSG_LEN:
-            truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.")
-            break
-        truncated_logs.append(log)
-    return truncated_logs
\ No newline at end of file
+# Truncate execution logs to make sure gRPC response doesn't exceed the gRPC buffer capacity
+def truncate_execution_output(execution_output):
+  sum_truncated_chars = 0
+  if execution_output.ByteSize() > RESPONSE_MAX_SIZE:
+    while execution_output.ByteSize() > RESPONSE_MAX_SIZE:
+        removed_item = execution_output.response.pop()
+        sum_truncated_chars += len(removed_item)
+    execution_output.response.append("[...] TRUNCATED CHARS : {}".format(sum_truncated_chars))
+  return execution_output
\ No newline at end of file