execution timeout not respected... 36/99836/5
authorOleg Mitsura <oleg.mitsura@amdocs.com>
Thu, 19 Dec 2019 21:47:14 +0000 (16:47 -0500)
committerOleg Mitsura <oleg.mitsura@amdocs.com>
Mon, 23 Dec 2019 15:20:40 +0000 (10:20 -0500)
Issue-ID: CCSDK-2012

rev1: initial commit
rev2: reworked + added grpc deadline
rev3: wrong err msg..
rev4: timeouts in millis
rev5: timeout defaulting was after logging...
Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
Change-Id: I02eb414904cc5dfcf51ff7b7552dafe54857ed1e

ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/RemoteScriptExecutionService.kt

index 6b1f186..3250cd3 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.python.executor
 
 import com.fasterxml.jackson.databind.JsonNode
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.TimeoutCancellationException
+import kotlinx.coroutines.async
+import kotlinx.coroutines.withTimeout
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.*
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.ExecutionServiceConstant
@@ -128,17 +132,36 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
                 requestId = processId,
                 remoteIdentifier = RemoteIdentifier(blueprintName = blueprintName, blueprintVersion = blueprintVersion),
                 command = scriptCommand,
-                properties = properties)
-            val remoteExecutionOutput = remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+                properties = properties,
+                timeOut = timeout.toLong())
+
+
+            val remoteExecutionOutputDeferred = GlobalScope.async {
+                remoteScriptExecutionService.executeCommand(remoteExecutionInput)
+            }
+
+            val remoteExecutionOutput = withTimeout(timeout * 1000L) {
+                remoteExecutionOutputDeferred.await()
+            }
+
+            checkNotNull(remoteExecutionOutput) {
+                "Error: Request-id $processId did not return a restul from remote command execution."
+            }
 
             val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
             if (remoteExecutionOutput.status != StatusType.SUCCESS) {
-                setNodeOutputErrors(remoteExecutionOutput.status.name,logs, remoteExecutionOutput.payload)
+                setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
             } else {
                 setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
-                                        remoteExecutionOutput.payload)
+                    remoteExecutionOutput.payload)
             }
 
+        } catch (timeoutEx: TimeoutCancellationException) {
+            setNodeOutputErrors(status = "Command executor timed out after $timeout seconds", message = "".asJsonPrimitive())
+            log.error("Command executor timed out after $timeout seconds", timeoutEx)
+        } catch (grpcEx: io.grpc.StatusRuntimeException) {
+            setNodeOutputErrors(status = "Command executor timed out in GRPC call", message = "${grpcEx.status}".asJsonPrimitive())
+            log.error("Command executor time out during GRPC call", grpcEx)
         } catch (e: Exception) {
             log.error("Failed to process on remote executor", e)
         } finally {
@@ -176,7 +199,7 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
     /**
      * Utility function to set the output properties and errors of the executor node, in cas of errors
      */
-    private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()  ) {
+    private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
index 5163a93..7a0f167 100644 (file)
@@ -85,12 +85,13 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
         check(operationName.isNotEmpty()) { "couldn't get Operation name for step($stepName)" }
 
         val operationResolvedProperties = bluePrintRuntimeService
-                .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+            .resolveNodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
 
         this.operationInputs.putAll(operationResolvedProperties)
 
         val timeout = this.operationInputs.getOptionalAsInt(BluePrintConstants.PROPERTY_CURRENT_TIMEOUT)
         timeout?.let { this.timeout = timeout }
+        log.debug("DEBUG::: AbstractComponentFunction prepareRequestNB.timeout ($timeout)")
 
         return executionRequest
     }
@@ -99,11 +100,11 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
         log.info("Preparing Response...")
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
-        var status = Status()
+        val status = Status()
         try {
             // Resolve the Output Expression
             val stepOutputs = bluePrintRuntimeService
-                    .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName)
+                .resolveNodeTemplateInterfaceOperationOutputs(nodeTemplateName, interfaceName, operationName)
 
             val stepOutputData = StepData().apply {
                 name = stepName
@@ -123,7 +124,8 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
     override suspend fun applyNB(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         try {
             prepareRequestNB(executionServiceInput)
-            withTimeout((timeout * 1000).toLong()) {
+            withTimeout(timeout * 1000L) {
+                log.debug("DEBUG::: AbstractComponentFunction.withTimeout section $timeout seconds")
                 processNB(executionServiceInput)
             }
         } catch (runtimeException: RuntimeException) {
@@ -135,7 +137,7 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
 
     fun getOperationInput(key: String): JsonNode {
         return operationInputs[key]
-                ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.")
+            ?: throw BluePrintProcessorException("couldn't get the operation input($key) value.")
     }
 
     fun getOptionalOperationInput(key: String): JsonNode? {
@@ -189,4 +191,4 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
         return file.readNBLines()
     }
 
-}
\ No newline at end of file
+}
index d6146e1..5801af9 100644 (file)
@@ -32,6 +32,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
 import org.springframework.context.annotation.Scope
 import org.springframework.stereotype.Service
+import java.util.concurrent.TimeUnit
 
 interface RemoteScriptExecutionService {
     suspend fun init(selector: Any)
@@ -42,7 +43,7 @@ interface RemoteScriptExecutionService {
 
 @Service(ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION)
 @ConditionalOnProperty(prefix = "blueprintprocessor.remoteScriptCommand", name = arrayOf("enabled"),
-        havingValue = "true", matchIfMissing = false)
+    havingValue = "true", matchIfMissing = false)
 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
     : RemoteScriptExecutionService {
@@ -54,12 +55,12 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
 
     override suspend fun init(selector: Any) {
         // Get the GRPC Client Service based on selector
-        val grpcClientService: BluePrintGrpcClientService
-        if (selector is JsonNode) {
-            grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector)
+        val grpcClientService: BluePrintGrpcClientService = if (selector is JsonNode) {
+            bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector)
         } else {
-            grpcClientService = bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString())
+            bluePrintGrpcLibPropertyService.blueprintGrpcClientService(selector.toString())
         }
+
         // Get the GRPC Channel
         channel = grpcClientService.channel()
         // Create Non Blocking Stub
@@ -70,9 +71,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
         }
     }
 
-    override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput)
-            : RemoteScriptExecutionOutput {
-        val grpResponse = commandExecutorServiceGrpc.prepareEnv(prepareEnvInput.asGrpcData())
+    override suspend fun prepareEnv(prepareEnvInput: PrepareRemoteEnvInput): RemoteScriptExecutionOutput {
+        val grpResponse = commandExecutorServiceGrpc
+            .withDeadlineAfter(prepareEnvInput.timeOut * 1000, TimeUnit.MILLISECONDS)
+            .prepareEnv(prepareEnvInput.asGrpcData())
 
         checkNotNull(grpResponse.status) {
             "failed to get GRPC prepare env response status for requestId(${prepareEnvInput.requestId})"
@@ -85,18 +87,19 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
     }
 
     override suspend fun executeCommand(remoteExecutionInput: RemoteScriptExecutionInput)
-            : RemoteScriptExecutionOutput {
-
-        val grpResponse = commandExecutorServiceGrpc.executeCommand(remoteExecutionInput.asGrpcData())
+        : RemoteScriptExecutionOutput {
+        val grpResponse =
+            commandExecutorServiceGrpc
+                .withDeadlineAfter(remoteExecutionInput.timeOut * 1000, TimeUnit.MILLISECONDS)
+                .executeCommand(remoteExecutionInput.asGrpcData())
 
         checkNotNull(grpResponse.status) {
             "failed to get GRPC response status for requestId(${remoteExecutionInput.requestId})"
         }
 
-        val remoteScriptExecutionOutput = grpResponse.asJavaData()
         log.debug("Received response from command server for requestId(${remoteExecutionInput.requestId})")
+        return grpResponse.asJavaData()
 
-        return remoteScriptExecutionOutput
     }
 
     override suspend fun close() {
@@ -116,33 +119,33 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
         }
 
         return PrepareEnvInput.newBuilder()
-                .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
-                .setRequestId(this.requestId)
-                .setCorrelationId(correlationId)
-                .setTimeOut(this.timeOut.toInt())
-                .addAllPackages(packageList)
-                .setProperties(this.properties.asGrpcData())
-                .build()
+            .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
+            .setRequestId(this.requestId)
+            .setCorrelationId(correlationId)
+            .setTimeOut(this.timeOut.toInt())
+            .addAllPackages(packageList)
+            .setProperties(this.properties.asGrpcData())
+            .build()
     }
 
     fun RemoteScriptExecutionInput.asGrpcData(): ExecutionInput {
         val correlationId = this.correlationId ?: this.requestId
         return ExecutionInput.newBuilder()
-                .setRequestId(this.requestId)
-                .setCorrelationId(correlationId)
-                .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
-                .setCommand(this.command)
-                .setTimeOut(this.timeOut.toInt())
-                .setProperties(this.properties.asGrpcData())
-                .setTimestamp(Timestamp.getDefaultInstance())
-                .build()
+            .setRequestId(this.requestId)
+            .setCorrelationId(correlationId)
+            .setIdentifiers(this.remoteIdentifier!!.asGrpcData())
+            .setCommand(this.command)
+            .setTimeOut(this.timeOut.toInt())
+            .setProperties(this.properties.asGrpcData())
+            .setTimestamp(Timestamp.getDefaultInstance())
+            .build()
     }
 
     fun RemoteIdentifier.asGrpcData(): Identifiers? {
         return Identifiers.newBuilder()
-                .setBlueprintName(this.blueprintName)
-                .setBlueprintVersion(this.blueprintVersion)
-                .build()
+            .setBlueprintName(this.blueprintName)
+            .setBlueprintVersion(this.blueprintVersion)
+            .build()
     }
 
     fun Map<String, JsonNode>.asGrpcData(): Struct {
@@ -153,10 +156,10 @@ class GrpcRemoteScriptExecutionService(private val bluePrintGrpcLibPropertyServi
 
     fun ExecutionOutput.asJavaData(): RemoteScriptExecutionOutput {
         return RemoteScriptExecutionOutput(
-                requestId = this.requestId,
-                response = this.responseList,
-                status = StatusType.valueOf(this.status.name),
-                payload =  payload.jsonAsJsonType()
+            requestId = this.requestId,
+            response = this.responseList,
+            status = StatusType.valueOf(this.status.name),
+            payload = payload.jsonAsJsonType()
         )
     }