Truncate message published on Kafka / Spike: Define solution for logs separation
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / python-executor / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / python / executor / ComponentRemotePythonExecutor.kt
index d66e8b3..d4c8841 100644 (file)
@@ -21,6 +21,7 @@ import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.TimeoutCancellationException
 import kotlinx.coroutines.async
 import kotlinx.coroutines.withTimeout
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteIdentifier
@@ -47,11 +48,15 @@ import org.springframework.stereotype.Component
 @ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION])
 @Component("component-remote-python-executor")
 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService) : AbstractComponentFunction() {
+open class ComponentRemotePythonExecutor(
+    private val remoteScriptExecutionService: RemoteScriptExecutionService,
+    private var bluePrintPropertiesService: BluePrintPropertiesService
+) : AbstractComponentFunction() {
 
     private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
 
     companion object {
+        const val SELECTOR_CMD_EXEC = "blueprintsprocessor.remote-script-command"
         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
         const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
         const val INPUT_ARGUMENT_PROPERTIES = "argument-properties"
@@ -62,6 +67,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
         const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout"
         const val INPUT_EXECUTE_TIMEOUT = "execution-timeout"
 
+        const val STEP_PREPARE_ENV = "prepare-env"
+        const val STEP_EXEC_CMD = "execute-command"
         const val ATTRIBUTE_EXEC_CMD_STATUS = "status"
         const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs"
         const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs"
@@ -74,6 +81,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
 
         log.debug("Processing : $operationInputs")
 
+        val isLogResponseEnabled = bluePrintPropertiesService.propertyBeanType("$SELECTOR_CMD_EXEC.response.log.enabled", Boolean::class.java)
+
         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
         val blueprintName = bluePrintContext.name()
         val blueprintVersion = bluePrintContext.version()
@@ -142,15 +151,25 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
                 )
                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
-                val logs = prepareEnvOutput.response
+                val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)
                 val logsEnv = logs.toString().asJsonPrimitive()
                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
 
                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
-                    setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive())
-                    setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv)
+                    val errorMessage = prepareEnvOutput.payload
+                    setNodeOutputErrors(prepareEnvOutput.status.name,
+                            STEP_PREPARE_ENV,
+                            logs,
+                            errorMessage,
+                            isLogResponseEnabled
+                    )
                 } else {
-                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive())
+                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
+                            STEP_PREPARE_ENV,
+                            logsEnv,
+                            "".asJsonPrimitive(),
+                            isLogResponseEnabled
+                    )
                 }
             } else {
                 // set env preparation log to empty...
@@ -159,13 +178,13 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
         } catch (grpcEx: io.grpc.StatusRuntimeException) {
             val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId)."
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
-            setNodeOutputErrors(status = grpcErrMsg, message = "${grpcEx.status}".asJsonPrimitive())
+            setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error(grpcErrMsg, grpcEx)
             addError(grpcErrMsg)
         } catch (e: Exception) {
             val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
-            setNodeOutputErrors(status = timeoutErrMsg, message = "${e.message}".asJsonPrimitive())
+            setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error("Failed to process on remote executor requestId ($processId)", e)
             addError(timeoutErrMsg)
         }
@@ -195,18 +214,37 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
                 }
                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
-                    setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
+                    setNodeOutputErrors(remoteExecutionOutput.status.name,
+                            STEP_EXEC_CMD,
+                            logs,
+                            remoteExecutionOutput.payload,
+                            isLogResponseEnabled
+                    )
                 } else {
-                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
-                        remoteExecutionOutput.payload)
+                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
+                            STEP_EXEC_CMD,
+                            logs,
+                            remoteExecutionOutput.payload,
+                            isLogResponseEnabled
+                    )
                 }
             } catch (timeoutEx: TimeoutCancellationException) {
                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
-                setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
+                setNodeOutputErrors(status = timeoutErrMsg,
+                        step = STEP_EXEC_CMD,
+                        logs = "".asJsonPrimitive(),
+                        error = "".asJsonPrimitive(),
+                        logging = isLogResponseEnabled
+                )
                 log.error(timeoutErrMsg, timeoutEx)
             } catch (grpcEx: io.grpc.StatusRuntimeException) {
                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
-                setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
+                setNodeOutputErrors(status = timeoutErrMsg,
+                        step = STEP_EXEC_CMD,
+                        logs = "".asJsonPrimitive(),
+                        error = "".asJsonPrimitive(),
+                        logging = isLogResponseEnabled
+                )
                 log.error("Command executor time out during GRPC call", grpcEx)
             } catch (e: Exception) {
                 log.error("Failed to process on remote executor requestId ($processId)", e)
@@ -234,25 +272,38 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
     /**
      * Utility function to set the output properties of the executor node
      */
-    private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
+    private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
-        log.info("Executor status   : $status")
         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
-        log.info("Executor artifacts: $artifacts")
         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
-        log.info("Executor message  : $message")
+
+        if (logging) {
+            log.info("Executor status   : $step : $status")
+            log.info("Executor artifacts: $step : $artifacts")
+            log.info("Executor message  : $step : $message")
+        }
     }
 
     /**
      * Utility function to set the output properties and errors of the executor node, in cas of errors
      */
-    private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
+    private fun setNodeOutputErrors(
+        status: String,
+        step: String,
+        logs: JsonNode = "N/A".asJsonPrimitive(),
+        error: JsonNode,
+        logging: Boolean = true
+    ) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
-        log.info("Executor status   : $status")
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
-        log.info("Executor message  : $message")
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
-        log.info("Executor artifacts: $artifacts")
-        addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString())
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive())
+
+        if (logging) {
+            log.info("Executor status   : $step : $status")
+            log.info("Executor message  : $step : $error")
+            log.info("Executor logs     : $step : $logs")
+        }
+
+        addError(status, step, error.toString())
     }
 }