/*
- * Copyright © 2019 IBM.
+ * Copyright © 2019 IBM.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor
import com.fasterxml.jackson.databind.JsonNode
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.async
+import kotlinx.coroutines.withTimeout
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteIdentifier
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteScriptExecutionInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.StatusType
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.RemoteScriptExecutionService
-import org.onap.ccsdk.cds.controllerblueprints.core.*
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.checkFileExists
+import org.onap.ccsdk.cds.controllerblueprints.core.checkNotBlank
import org.onap.ccsdk.cds.controllerblueprints.core.data.OperationAssignment
+import org.onap.ccsdk.cds.controllerblueprints.core.normalizedFile
+import org.onap.ccsdk.cds.controllerblueprints.core.returnNullIfMissing
+import org.onap.ccsdk.cds.controllerblueprints.core.rootFieldsToMap
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.config.ConfigurableBeanFactory
@ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION])
@Component("component-remote-python-executor")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService)
- : AbstractComponentFunction() {
+open class ComponentRemotePythonExecutor(
+ private val remoteScriptExecutionService: RemoteScriptExecutionService,
+ private var bluePrintPropertiesService: BluePrintPropertiesService
+) : AbstractComponentFunction() {
private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
companion object {
+ const val SELECTOR_CMD_EXEC = "blueprintsprocessor.remote-script-command"
const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
const val INPUT_ARGUMENT_PROPERTIES = "argument-properties"
+
const val INPUT_COMMAND = "command"
const val INPUT_PACKAGES = "packages"
const val DEFAULT_SELECTOR = "remote-python"
+ const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout"
+ const val INPUT_EXECUTE_TIMEOUT = "execution-timeout"
+ const val STEP_PREPARE_ENV = "prepare-env"
+ const val STEP_EXEC_CMD = "execute-command"
const val ATTRIBUTE_EXEC_CMD_STATUS = "status"
const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs"
const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs"
const val ATTRIBUTE_RESPONSE_DATA = "response-data"
+ const val DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC = 120
+ const val DEFAULT_EXECUTE_TIMEOUT_IN_SEC = 180
}
override suspend fun processNB(executionRequest: ExecutionServiceInput) {
log.debug("Processing : $operationInputs")
+ val isLogResponseEnabled = bluePrintPropertiesService.propertyBeanType("$SELECTOR_CMD_EXEC.response.log.enabled", Boolean::class.java)
+
val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
val blueprintName = bluePrintContext.name()
val blueprintVersion = bluePrintContext.version()
?.rootFieldsToMap()?.toSortedMap()?.values?.joinToString(" ") { formatNestedJsonNode(it) }
val command = getOperationInput(INPUT_COMMAND).asText()
+
+ /**
+ * Timeouts that are specific to the command executor.
+ * Note: the interface->input->timeout is the component level timeout.
+ */
+ val envPrepTimeout = getOptionalOperationInput(INPUT_ENV_PREPARE_TIMEOUT)?.asInt()
+ ?: DEFAULT_ENV_PREPARE_TIMEOUT_IN_SEC
+ val executionTimeout = getOptionalOperationInput(INPUT_EXECUTE_TIMEOUT)?.asInt()
+ ?: DEFAULT_EXECUTE_TIMEOUT_IN_SEC
+
var scriptCommand = command.replace(pythonScript.name, pythonScript.absolutePath)
if (args != null && args.isNotEmpty()) {
scriptCommand = scriptCommand.plus(" ").plus(args)
// If packages are defined, then install in remote server
if (packages != null) {
- val prepareEnvInput = PrepareRemoteEnvInput(requestId = processId,
- remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName,
+ val prepareEnvInput = PrepareRemoteEnvInput(
+ requestId = processId,
+ remoteIdentifier = RemoteIdentifier(
+ blueprintName = blueprintName,
blueprintVersion = blueprintVersion),
- packages = packages
+ packages = packages,
+ timeOut = envPrepTimeout.toLong()
+
)
val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
- val logs = prepareEnvOutput.response
+ val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)
val logsEnv = logs.toString().asJsonPrimitive()
setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
if (prepareEnvOutput.status != StatusType.SUCCESS) {
- setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive())
- setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv)
+ val errorMessage = prepareEnvOutput.payload
+ setNodeOutputErrors(prepareEnvOutput.status.name,
+ STEP_PREPARE_ENV,
+ logs,
+ errorMessage,
+ isLogResponseEnabled
+ )
} else {
- setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive())
+ setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
+ STEP_PREPARE_ENV,
+ logsEnv,
+ "".asJsonPrimitive(),
+ isLogResponseEnabled
+ )
}
+ } else {
+ // set env preparation log to empty...
+ setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, "".asJsonPrimitive())
}
-
- // if Env preparation was successful, then proceed with command execution in this Env
- if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
+ } catch (grpcEx: io.grpc.StatusRuntimeException) {
+ val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId)."
+ setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
+ setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled)
+ log.error(grpcErrMsg, grpcEx)
+ addError(grpcErrMsg)
+ } catch (e: Exception) {
+ val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
+ setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
+ setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled)
+ log.error("Failed to process on remote executor requestId ($processId)", e)
+ addError(timeoutErrMsg)
+ }
+ // if Env preparation was successful, then proceed with command execution in this Env
+ if (bluePrintRuntimeService.getBluePrintError().errors.isEmpty()) {
+ try {
// Populate command execution properties and pass it to the remote server
val properties = dynamicProperties?.returnNullIfMissing()?.rootFieldsToMap() ?: hashMapOf()
val remoteExecutionInput = RemoteScriptExecutionInput(
- requestId = processId,
- remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
- command = scriptCommand,
- properties = properties)
- val remoteExecutionOutput = remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+ requestId = processId,
+ remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
+ command = scriptCommand,
+ properties = properties,
+ timeOut = implementation.timeout.toLong())
+ val remoteExecutionOutputDeferred = GlobalScope.async {
+ remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+ }
+
+ val remoteExecutionOutput = withTimeout(implementation.timeout * 1000L) {
+ remoteExecutionOutputDeferred.await()
+ }
+
+ checkNotNull(remoteExecutionOutput) {
+ "Error: Request-id $processId did not return a restul from remote command execution."
+ }
val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
if (remoteExecutionOutput.status != StatusType.SUCCESS) {
- setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
+ setNodeOutputErrors(remoteExecutionOutput.status.name,
+ STEP_EXEC_CMD,
+ logs,
+ remoteExecutionOutput.payload,
+ isLogResponseEnabled
+ )
} else {
- setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
- remoteExecutionOutput.payload)
+ setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
+ STEP_EXEC_CMD,
+ logs,
+ remoteExecutionOutput.payload,
+ isLogResponseEnabled
+ )
}
+ } catch (timeoutEx: TimeoutCancellationException) {
+ val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
+ setNodeOutputErrors(status = timeoutErrMsg,
+ step = STEP_EXEC_CMD,
+ logs = "".asJsonPrimitive(),
+ error = "".asJsonPrimitive(),
+ logging = isLogResponseEnabled
+ )
+ log.error(timeoutErrMsg, timeoutEx)
+ } catch (grpcEx: io.grpc.StatusRuntimeException) {
+ val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
+ setNodeOutputErrors(status = timeoutErrMsg,
+ step = STEP_EXEC_CMD,
+ logs = "".asJsonPrimitive(),
+ error = "".asJsonPrimitive(),
+ logging = isLogResponseEnabled
+ )
+ log.error("Command executor time out during GRPC call", grpcEx)
+ } catch (e: Exception) {
+ log.error("Failed to process on remote executor requestId ($processId)", e)
}
- } catch (e: Exception) {
- log.error("Failed to process on remote executor", e)
- } finally {
- remoteScriptExecutionService.close()
}
+ log.debug("Trying to close GRPC channel. request ($processId)")
+ remoteScriptExecutionService.close()
}
override suspend fun recoverNB(runtimeException: RuntimeException, executionRequest: ExecutionServiceInput) {
/**
* Utility function to set the output properties of the executor node
*/
- private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
+ private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) {
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
- log.info("Executor status : $status")
setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
- log.info("Executor artifacts: $artifacts")
setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
- log.info("Executor message : $message")
+
+ if (logging) {
+ log.info("Executor status : $step : $status")
+ log.info("Executor artifacts: $step : $artifacts")
+ log.info("Executor message : $step : $message")
+ }
}
/**
* Utility function to set the output properties and errors of the executor node, in cas of errors
*/
- private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive() ) {
+ private fun setNodeOutputErrors(
+ status: String,
+ step: String,
+ logs: JsonNode = "N/A".asJsonPrimitive(),
+ error: JsonNode,
+ logging: Boolean = true
+ ) {
setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
- log.info("Executor status : $status")
- setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
- log.info("Executor message : $message")
- setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
- log.info("Executor artifacts: $artifacts")
+ setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
+ setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive())
+
+ if (logging) {
+ log.info("Executor status : $step : $status")
+ log.info("Executor message : $step : $error")
+ log.info("Executor logs : $step : $logs")
+ }
- addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString())
+ addError(status, step, error.toString())
}
}