Enabling Code Formatter
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / python-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / python / executor / ComponentRemotePythonExecutor.kt
index 7f32fa9..ce51bd9 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright Â© 2019 IBM.
+ *  Modifications Copyright © 2020 Bell Canada.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -56,6 +57,7 @@ open class ComponentRemotePythonExecutor(
     private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
 
     companion object {
+
         const val SELECTOR_CMD_EXEC = "blueprintsprocessor.remote-script-command"
         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
         const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
@@ -75,6 +77,7 @@ open class ComponentRemotePythonExecutor(
         const val ATTRIBUTE_RESPONSE_DATA = "response-data"
         const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120
         const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180
+        const val TIMEOUT_DELTA = 100L
     }
 
     override suspend fun processNB(executionRequest: ExecutionServiceInput) {
@@ -144,10 +147,13 @@ open class ComponentRemotePythonExecutor(
             // If packages are defined, then install in remote server
             if (packages != null) {
                 val prepareEnvInput = PrepareRemoteEnvInput(
+                    originatorId = executionServiceInput.commonHeader.originatorId,
                     requestId = processId,
+                    subRequestId = executionServiceInput.commonHeader.subRequestId,
                     remoteIdentifier = RemoteIdentifier(
                         blueprintName = blueprintName,
-                        blueprintVersion = blueprintVersion),
+                        blueprintVersion = blueprintVersion
+                    ),
                     packages = packages,
                     timeOut = envPrepTimeout.toLong()
 
@@ -155,40 +161,40 @@ open class ComponentRemotePythonExecutor(
                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
                 val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)
-                val logsEnv = logs.toString().asJsonPrimitive()
-                setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
+                setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logs)
 
+                // there are no artifacts for env. prepare, but we reuse it for err_log...
                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
-                    val errorMessage = prepareEnvOutput.payload
-                    setNodeOutputErrors(prepareEnvOutput.status.name,
-                        STEP_PREPARE_ENV,
-                        logs,
-                        errorMessage,
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputErrors(STEP_PREPARE_ENV, "[]".asJsonPrimitive(), prepareEnvOutput.payload, isLogResponseEnabled)
+                    addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, logs.toString())
                 } else {
-                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
-                        STEP_PREPARE_ENV,
-                        logsEnv,
-                        "".asJsonPrimitive(),
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputProperties(prepareEnvOutput.status, STEP_PREPARE_ENV, logs, prepareEnvOutput.payload, isLogResponseEnabled)
                 }
             } else {
                 // set env preparation log to empty...
                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, "".asJsonPrimitive())
             }
+            // in cases where the exception is caught in BP side due to timeout, we do not have `err_msg` returned by cmd-exec (inside `payload`),
+            // hence `artifact` field will be empty
         } catch (grpcEx: io.grpc.StatusRuntimeException) {
-            val componentLevelWarningMsg = if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
-            val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId). $componentLevelWarningMsg grpcError: ${grpcEx.status}"
+            val componentLevelWarningMsg =
+                if (timeout < envPrepTimeout) "Note: component-level timeout ($timeout) is shorter than env-prepare timeout ($envPrepTimeout). " else ""
+            val grpcErrMsg =
+                "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId).$componentLevelWarningMsg grpcError: (${grpcEx.cause?.message})"
+            // no execution log in case of timeout (as cmd-exec side hasn't finished to transfer output)
+            // set prepare-env-log to the error msg, and cmd-exec-log to empty
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = grpcErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
+            setNodeOutputErrors(STEP_PREPARE_ENV, "[]".asJsonPrimitive(), "{}".asJsonPrimitive(), isLogResponseEnabled)
+            addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, grpcErrMsg)
             log.error(grpcErrMsg, grpcEx)
         } catch (e: Exception) {
-            val timeoutErrMsg = "Command executor failed during env. preparation.. catch-all case timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
-            setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
-            setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
-            log.error(timeoutErrMsg, e)
+            val catchallErrMsg =
+                "Command executor failed during env. preparation.. catch-all case. timeout($envPrepTimeout) requestId ($processId). exception msg: ${e.message}"
+            // no environment prepare log from executor in case of timeout (as cmd-exec side hasn't finished to transfer output), set it to error msg. Execution logs is empty.
+            setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, catchallErrMsg.asJsonPrimitive())
+            setNodeOutputErrors(STEP_PREPARE_ENV, "[]".asJsonPrimitive(), "{}".asJsonPrimitive(), isLogResponseEnabled)
+            addError(StatusType.FAILURE.name, STEP_PREPARE_ENV, catchallErrMsg)
+            log.error(catchallErrMsg, e)
         }
         // if Env preparation was successful, then proceed with command execution in this Env
         if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
@@ -197,17 +203,20 @@ open class ComponentRemotePythonExecutor(
                 val properties = dynamicProperties?.returnNullIfMissing()?.rootFieldsToMap() ?: hashMapOf()
 
                 val remoteExecutionInput = RemoteScriptExecutionInput(
+                    originatorId = executionServiceInput.commonHeader.originatorId,
                     requestId = processId,
+                    subRequestId = executionServiceInput.commonHeader.subRequestId,
                     remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
                     command = scriptCommand,
                     properties = properties,
-                    timeOut = implementation.timeout.toLong())
+                    timeOut = executionTimeout.toLong()
+                )
 
                 val remoteExecutionOutputDeferred = GlobalScope.async {
                     remoteScriptExecutionService.executeCommand(remoteExecutionInput)
                 }
 
-                val remoteExecutionOutput = withTimeout(implementation.timeout * 1000L) {
+                val remoteExecutionOutput = withTimeout(executionTimeout * 1000L + TIMEOUT_DELTA) {
                     remoteExecutionOutputDeferred.await()
                 }
 
@@ -215,44 +224,35 @@ open class ComponentRemotePythonExecutor(
                     "Error: Request-id $processId did not return a result from remote command execution."
                 }
                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
+                val returnedPayload = remoteExecutionOutput.payload
+                // In case of execution, `payload` (dictionary from Python execution) is preserved in `remoteExecutionOutput.payload`;
+                // It would contain `err_msg` key. It is valid to return it.
                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
-                    setNodeOutputErrors(remoteExecutionOutput.status.name,
-                        STEP_EXEC_CMD,
-                        logs,
-                        remoteExecutionOutput.payload,
-                        isLogResponseEnabled
-                    )
+                    setNodeOutputErrors(STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled)
+                    addError(StatusType.FAILURE.name, STEP_EXEC_CMD, logs.toString())
                 } else {
-                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
-                        STEP_EXEC_CMD,
-                        logs,
-                        remoteExecutionOutput.payload,
-                        isLogResponseEnabled
-                    )
-                }
+                    setNodeOutputProperties(remoteExecutionOutput.status, STEP_EXEC_CMD, logs, returnedPayload, isLogResponseEnabled)
+                } // In timeout exception cases, we don't have payload, hence `payload` is empty value.
             } catch (timeoutEx: TimeoutCancellationException) {
-                val componentLevelWarningMsg = if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else ""
-                val timeoutErrMsg = "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg"
-                setNodeOutputErrors(status = StatusType.FAILURE.name,
-                    step = STEP_EXEC_CMD,
-                    logs = "".asJsonPrimitive(),
-                    message = timeoutErrMsg.asJsonPrimitive(),
-                    logging = isLogResponseEnabled
-                )
+                val componentLevelWarningMsg =
+                    if (timeout < executionTimeout) "Note: component-level timeout ($timeout) is shorter than execution timeout ($executionTimeout). " else ""
+                val timeoutErrMsg =
+                    "Command executor execution timeout. DetailedMessage: (${timeoutEx.message}) requestId ($processId). $componentLevelWarningMsg"
+                setNodeOutputErrors(STEP_EXEC_CMD, listOf(timeoutErrMsg).asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg)
                 log.error(timeoutErrMsg, timeoutEx)
             } catch (grpcEx: io.grpc.StatusRuntimeException) {
-                val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}"
-                setNodeOutputErrors(status = StatusType.FAILURE.name,
-                    step = STEP_EXEC_CMD,
-                    logs = "".asJsonPrimitive(),
-                    message = timeoutErrMsg.asJsonPrimitive(),
-                    logging = isLogResponseEnabled
-                )
+                val timeoutErrMsg =
+                    "Command executor timed out executing after $executionTimeout seconds requestId ($processId) grpcErr: ${grpcEx.status}"
+                setNodeOutputErrors(STEP_EXEC_CMD, listOf(timeoutErrMsg).asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, timeoutErrMsg)
                 log.error(timeoutErrMsg, grpcEx)
             } catch (e: Exception) {
-                val timeoutErrMsg = "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
-                setNodeOutputErrors(status = StatusType.FAILURE.name, step = STEP_PREPARE_ENV, message = timeoutErrMsg.asJsonPrimitive(), logging = isLogResponseEnabled)
-                log.error(timeoutErrMsg, e)
+                val catchAllErrMsg =
+                    "Command executor failed during process catch-all case requestId ($processId) timeout($envPrepTimeout) exception msg: ${e.message}"
+                setNodeOutputErrors(STEP_PREPARE_ENV, listOf(catchAllErrMsg).asJsonPrimitive(), logging = isLogResponseEnabled)
+                addError(StatusType.FAILURE.name, STEP_EXEC_CMD, catchAllErrMsg)
+                log.error(catchAllErrMsg, e)
             }
         }
         log.debug("Trying to close GRPC channel. request ($processId)")
@@ -278,21 +278,21 @@ open class ComponentRemotePythonExecutor(
      * Utility function to set the output properties of the executor node
      */
     private fun setNodeOutputProperties(
-        status: JsonNode = StatusType.FAILURE.name.asJsonPrimitive(),
+        status: StatusType,
         step: String,
-        logs: JsonNode,
-        message: JsonNode,
+        executionLogs: JsonNode,
+        artifacts: JsonNode,
         logging: Boolean = true
     ) {
 
-        setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
+        setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.name.asJsonPrimitive())
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
 
         if (logging) {
             log.info("Executor status : $step : $status")
-            log.info("Executor message: $step : $message")
-            log.info("Executor logs   : $step : $logs")
+            log.info("Executor logs   : $step : $executionLogs")
+            log.info("Executor artifacts: $step : $artifacts")
         }
     }
 
@@ -300,22 +300,20 @@ open class ComponentRemotePythonExecutor(
      * Utility function to set the output properties and errors of the executor node, in case of errors
      */
     private fun setNodeOutputErrors(
-        status: String,
         step: String,
-        logs: JsonNode = "N/A".asJsonPrimitive(),
-        message: JsonNode,
+        executionLogs: JsonNode = "[]".asJsonPrimitive(),
+        artifacts: JsonNode = "{}".asJsonPrimitive(),
         logging: Boolean = true
     ) {
+        val status = StatusType.FAILURE.name
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, message)
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, executionLogs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
 
         if (logging) {
             log.info("Executor status : $step : $status")
-            log.info("Executor message: $step : $message")
-            log.info("Executor logs   : $step : $logs")
+            log.info("Executor logs   : $step : $executionLogs")
+            log.info("Executor artifacts: $step : $artifacts")
         }
-
-        addError(status, step, logs.toString())
     }
 }