cmd-exec payload should be payload, not err msg. 32/110132/4
authorOleg Mitsura <oleg.mitsura@amdocs.com>
Mon, 13 Jul 2020 17:38:10 +0000 (13:38 -0400)
committerOleg Mitsura <oleg.mitsura@amdocs.com>
Tue, 14 Jul 2020 05:21:11 +0000 (01:21 -0400)
Issue-ID: CCSDK-2549

fixes:
1. CMD-exec returns payload as JSON object.
There was some regression where we overwrite returned JSON with errMsg.
2. adds 100ms delta to executionTimeout, as was in our downstream version
(want to guarantee that we get grpc timeout exception on BP side
rather than coroutine w/o waiting extra)
3. setNodeOutputErrors should not have ability to set execution status
to success.

rev1. initial import
rev2. reworked the calls to setNodeOutputErrors: addError was brought out of it
as we can set the node execution error based on prepare.env or execution.
rev3. did not mean to add BlueprintRuntimeService change

Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
Change-Id: I6d509df5ae51598f33ab9f0ea53806d653cf79c0

ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt

index e480167..59c4484 100644 (file)
@@ -75,6 +75,7 @@ open class ComponentRemotePythonExecutor(
         const val ATTRIBUTE_RESPONSE_DATA = "response-data"
         const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120
         const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180
+        const val TIMEOUT_DELTA = 100L
     }
 
     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
@@ -155,40 +156,37 @@ open class ComponentRemotePythonExecutor(
                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
                 val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)
-                val logsEnv = logs.toString().asJsonPrimitive()
-                setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
+                setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logs)
 
+                // there are no artifacts for env. prepare, but we reuse it for err_log...
                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
-                    val errorMessage = prepareEnvOutput.payload
-                    setNodeOutputErrors(prepareEnvOutput.status.name,
-                        STEP_PREPARE_ENV,
-                        logs,
-                        errorMessage,
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), prepareEnvOutput.payload, isLogResponseEnabled)
+                    addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, logs.toString())
                 } else {
-                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
-                        STEP_PREPARE_ENV,
-                        logsEnv,
-                        "".asJsonPrimitive(),
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputProperties(prepareEnvOutput.status, STEP_PREPARE_ENV, logs, prepareEnvOutput.payload, isLogResponseEnabled)
                 }
             } else {
                 // set env preparation log to empty...
                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, "".asJsonPrimitive())
             }
+            // in cases where the exception is caught in BP side due to timeout, we do not have `err_msg` returned by cmd-exec (inside `payload`),
+            // hence `artifact` field will be empty
         } catch (grpcEx: io.grpc.StatusRuntimeException) {
             val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
-            val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}"
+            val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId).$componentLevelWarningMsg grpcError: (${grpcEx.cause?.message})"
+            // no execution log in case of timeout (as cmd-exec side hasn't finished to transfer output)
+            // set prepare-env-log to the error msg, and cmd-exec-log to empty
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+            setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), "".asJsonPrimitive(), isLogResponseEnabled)
+            addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, grpcErrMsg)
             log.error(grpcErrMsg, grpcEx)
         } catch (e: Exception) {
-            val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
-            setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
-            log.error(timeoutErrMsg, e)
+            val catchallErrMsg = "Command executor failed during env. preparation.. catch-all case. timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
+            // no environment prepare log from executor in case of timeout (as cmd-exec side hasn't finished to transfer output), set it to error msg. Execution logs is empty.
+            setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, catchallErrMsg.asJsonPrimitive())
+            setNodeOutputErrors(STEP_PREPARE_ENV, "".asJsonPrimitive(), "".asJsonPrimitive(), isLogResponseEnabled)
+            addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, catchallErrMsg)
+            log.error(catchallErrMsg, e)
         }
         // if Env preparation was successful, then proceed with command execution in this Env
         if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
@@ -207,7 +205,7 @@ open class ComponentRemotePythonExecutor(
                     remoteScriptExecutionService.executeCommand(remoteExecutionInput)
                 }
 
-                val remoteExecutionOutput = withTimeout(implementation.timeout * 1000L) {
+                val remoteExecutionOutput = withTimeout(executionTimeout * 1000L + TIMEOUT_DELTA) {
                     remoteExecutionOutputDeferred.await()
                 }
 
@@ -215,44 +213,31 @@ open class ComponentRemotePythonExecutor(
                     "Error: Request-id $processId did not return a result from remote command execution."
                 }
                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
+                val returnedPayload = remoteExecutionOutput.payload
+                // In case of execution, `payload` (dictionary from Python execution) is preserved in `remoteExecutionOutput.payload`;
+                // It would contain `err_msg` key. It is valid to return it.
                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
-                    setNodeOutputErrors(remoteExecutionOutput.status.name,
-                        STEP_EXEC_CMD,
-                        logs,
-                        remoteExecutionOutput.payload,
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputErrors(STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled)
+                    addError(StatusType.FAILURE.name, STEP_EXEC_CMD, logs.toString())
                 } else {
-                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
-                        STEP_EXEC_CMD,
-                        logs,
-                        remoteExecutionOutput.payload,
-                        isLogResponseEnabled
-                    )
-                }
+                    setNodeOutputProperties(remoteExecutionOutput.status, STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled)
+                } // In timeout exception cases, we don't have payload, hence `payload` is empty value.
             } catch (timeoutEx: TimeoutCancellationException) {
                 val componentLevelWarningMsg = if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else ""
                 val timeoutErrMsg = "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg"
-                setNodeOutputErrors(status = StatusType.FAILURE.name,
-                    step = STEP_EXEC_CMD,
-                    logs = "".asJsonPrimitive(),
-                    message = timeoutErrMsg.asJsonPrimitive(),
-                    logging = isLogResponseEnabled
-                )
+                setNodeOutputErrors(STEP_EXEC_CMD, timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg)
                 log.error(timeoutErrMsg, timeoutEx)
             } catch (grpcEx: io.grpc.StatusRuntimeException) {
                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}"
-                setNodeOutputErrors(status = StatusType.FAILURE.name,
-                    step = STEP_EXEC_CMD,
-                    logs = "".asJsonPrimitive(),
-                    message = timeoutErrMsg.asJsonPrimitive(),
-                    logging = isLogResponseEnabled
-                )
+                setNodeOutputErrors(STEP_EXEC_CMD, timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg)
                 log.error(timeoutErrMsg, grpcEx)
             } catch (e: Exception) {
-                val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
-                setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
-                log.error(timeoutErrMsg, e)
+                val catchAllErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
+                setNodeOutputErrors(STEP_PREPARE_ENV, catchAllErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, catchAllErrMsg)
+                log.error(catchAllErrMsg, e)
             }
         }
         log.debug("Trying to close GRPC channel. request ($processId)")
@@ -278,21 +263,21 @@ open class ComponentRemotePythonExecutor(
      * Utility function to set the output properties of the executor node
      */
     private fun setNodeOutputProperties(
-        status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(),
+        status: StatusType,
         step: String,
-        logs: JsonNode,
-        message: JsonNode,
+        executionLogs: JsonNode,
+        artifacts: JsonNode,
         logging: Boolean = true
     ) {
 
-        setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
+        setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.name.asJsonPrimitive())
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
 
         if (logging) {
             log.info("Executor status : $step : $status")
-            log.info("Executor message: $step : $message")
-            log.info("Executor logs   : $step : $logs")
+            log.info("Executor logs   : $step : $executionLogs")
+            log.info("Executor artifacts: $step : $artifacts")
         }
     }
 
@@ -300,22 +285,20 @@ open class ComponentRemotePythonExecutor(
      * Utility function to set the output properties and errors of the executor node, in case of errors
      */
     private fun setNodeOutputErrors(
-        status: String,
         step: String,
-        logs: JsonNode = "N/A".asJsonPrimitive(),
-        message: JsonNode,
+        executionLogs: JsonNode = "N/A".asJsonPrimitive(),
+        artifacts: JsonNode = "N/A".asJsonPrimitive(),
         logging: Boolean = true
     ) {
+        val status = StatusType.FAILURE.name
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
 
         if (logging) {
             log.info("Executor status : $step : $status")
-            log.info("Executor message: $step : $message")
-            log.info("Executor logs   : $step : $logs")
+            log.info("Executor logs   : $step : $executionLogs")
+            log.info("Executor artifacts: $step : $artifacts")
         }
-
-        addError(status, step, logs.toString())
     }
 }