Truncate message published on Kafka / Spike: Define solution for logs separation 60/106360/6
authorJulien Fontaine <julien.fontaine@bell.ca>
Mon, 20 Apr 2020 15:53:41 +0000 (11:53 -0400)
committerJulien Fontaine <julien.fontaine@bell.ca>
Tue, 5 May 2020 22:17:52 +0000 (18:17 -0400)
Refactoring of cmd-exec component
  - Improve display of error messages within the response
  - Fix gRPC buffer limitation (4Mb) by truncating error messages and cmd-exec logs if too heavy (>3Mb)
Truncation of BP responses (<4Kb) before sending them in kafka audit topics.
  - Truncation if needed of error messages for every response
  - Truncation of cmd-exec logs in cmd-exec responses
(Spike) Add a flag in the application.properties to enable/disable the display of cmd-exec responses on the BP side
(Fix) Correction of BP processing with kafka regression
(Fix) Changed default SSL Endpoint Algo

Issue-ID: CCSDK-2326
Change-Id: If4d0e661117d1dd156cf19c95774824e754d870a
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
13 files changed:
ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
ms/blueprintsprocessor/functions/python-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutor.kt
ms/blueprintsprocessor/functions/python-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/python/executor/ComponentRemotePythonExecutorTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
ms/command-executor/src/main/docker/Dockerfile
ms/command-executor/src/main/python/command_executor_handler.py
ms/command-executor/src/main/python/command_executor_server.py
ms/command-executor/src/main/python/utils.py

index bf5e23b..fb32d9a 100755 (executable)
@@ -69,9 +69,9 @@ blueprintsprocessor.grpcclient.py-executor.trustCertCollection=src/main/resource
 
 # Blueprint Processor File Execution and Handling Properties
 ### use absolute paths if testing inside docker
-### blueprintsprocessor.blueprintDeployPath=/opt/app/onap/blueprints/deploy
-### blueprintsprocessor.blueprintArchivePath=/opt/app/onap/blueprints/archive
-### blueprintsprocessor.blueprintWorkingPath=/opt/app/onap/blueprints/working
+#blueprintsprocessor.blueprintDeployPath=/opt/app/onap/blueprints/deploy
+#blueprintsprocessor.blueprintArchivePath=/opt/app/onap/blueprints/archive
+#blueprintsprocessor.blueprintWorkingPath=/opt/app/onap/blueprints/working
 
 
 # db
@@ -120,10 +120,12 @@ blueprintsprocessor.netconfExecutor.enabled=true
 blueprintsprocessor.restConfExecutor.enabled=true
 blueprintsprocessor.cliExecutor.enabled=true
 ### If enabling remote python executor, set this value to true
-### blueprintsprocessor.remoteScriptCommand.enabled=true
+#blueprintsprocessor.remoteScriptCommand.enabled=true
 blueprintsprocessor.remoteScriptCommand.enabled=false
+blueprintsprocessor.remote-script-command.response.log.enabled=false
 
 # Kafka-message-lib Configurations
+## Request consumer
 blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
 blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
 blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
@@ -141,6 +143,12 @@ blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
 #blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user
 #blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword
 
+## Response producer
+blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.clientId=producer-id
+blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
+
 # Kafka audit service Configurations
 ## Audit request
 blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
index d66e8b3..d4c8841 100644 (file)
@@ -21,6 +21,7 @@ import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.TimeoutCancellationException
 import kotlinx.coroutines.async
 import kotlinx.coroutines.withTimeout
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteIdentifier
@@ -47,11 +48,15 @@ import org.springframework.stereotype.Component
 @ConditionalOnBean(name = [ExecutionServiceConstant.SERVICE_GRPC_REMOTE_SCRIPT_EXECUTION])
 @Component("component-remote-python-executor")
 @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
-open class ComponentRemotePythonExecutor(private val remoteScriptExecutionService: RemoteScriptExecutionService) : AbstractComponentFunction() {
+open class ComponentRemotePythonExecutor(
+    private val remoteScriptExecutionService: RemoteScriptExecutionService,
+    private var bluePrintPropertiesService: BluePrintPropertiesService
+) : AbstractComponentFunction() {
 
     private val log = LoggerFactory.getLogger(ComponentRemotePythonExecutor::class.java)!!
 
     companion object {
+        const val SELECTOR_CMD_EXEC = "blueprintsprocessor.remote-script-command"
         const val INPUT_ENDPOINT_SELECTOR = "endpoint-selector"
         const val INPUT_DYNAMIC_PROPERTIES = "dynamic-properties"
         const val INPUT_ARGUMENT_PROPERTIES = "argument-properties"
@@ -62,6 +67,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
         const val INPUT_ENV_PREPARE_TIMEOUT = "env-prepare-timeout"
         const val INPUT_EXECUTE_TIMEOUT = "execution-timeout"
 
+        const val STEP_PREPARE_ENV = "prepare-env"
+        const val STEP_EXEC_CMD = "execute-command"
         const val ATTRIBUTE_EXEC_CMD_STATUS = "status"
         const val ATTRIBUTE_PREPARE_ENV_LOG = "prepare-environment-logs"
         const val ATTRIBUTE_EXEC_CMD_LOG = "execute-command-logs"
@@ -74,6 +81,8 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
 
         log.debug("Processing : $operationInputs")
 
+        val isLogResponseEnabled = bluePrintPropertiesService.propertyBeanType("$SELECTOR_CMD_EXEC.response.log.enabled", Boolean::class.java)
+
         val bluePrintContext = bluePrintRuntimeService.bluePrintContext()
         val blueprintName = bluePrintContext.name()
         val blueprintVersion = bluePrintContext.version()
@@ -142,15 +151,25 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
                 )
                 val prepareEnvOutput = remoteScriptExecutionService.prepareEnv(prepareEnvInput)
                 log.info("$ATTRIBUTE_PREPARE_ENV_LOG - ${prepareEnvOutput.response}")
-                val logs = prepareEnvOutput.response
+                val logs = JacksonUtils.jsonNodeFromObject(prepareEnvOutput.response)
                 val logsEnv = logs.toString().asJsonPrimitive()
                 setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, logsEnv)
 
                 if (prepareEnvOutput.status != StatusType.SUCCESS) {
-                    setAttribute(ATTRIBUTE_EXEC_CMD_LOG, "N/A".asJsonPrimitive())
-                    setNodeOutputErrors(prepareEnvOutput.status.name, logsEnv)
+                    val errorMessage = prepareEnvOutput.payload
+                    setNodeOutputErrors(prepareEnvOutput.status.name,
+                            STEP_PREPARE_ENV,
+                            logs,
+                            errorMessage,
+                            isLogResponseEnabled
+                    )
                 } else {
-                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(), logsEnv, "".asJsonPrimitive())
+                    setNodeOutputProperties(prepareEnvOutput.status.name.asJsonPrimitive(),
+                            STEP_PREPARE_ENV,
+                            logsEnv,
+                            "".asJsonPrimitive(),
+                            isLogResponseEnabled
+                    )
                 }
             } else {
                 // set env preparation log to empty...
@@ -159,13 +178,13 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
         } catch (grpcEx: io.grpc.StatusRuntimeException) {
             val grpcErrMsg = "Command failed during env. preparation... timeout($envPrepTimeout) requestId ($processId)."
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, grpcErrMsg.asJsonPrimitive())
-            setNodeOutputErrors(status = grpcErrMsg, message = "${grpcEx.status}".asJsonPrimitive())
+            setNodeOutputErrors(status = grpcErrMsg, step = STEP_PREPARE_ENV, error = "${grpcEx.status}".asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error(grpcErrMsg, grpcEx)
             addError(grpcErrMsg)
         } catch (e: Exception) {
             val timeoutErrMsg = "Command executor failed during env. preparation.. timeout($envPrepTimeout) requestId ($processId)."
             setAttribute(ATTRIBUTE_PREPARE_ENV_LOG, e.message.asJsonPrimitive())
-            setNodeOutputErrors(status = timeoutErrMsg, message = "${e.message}".asJsonPrimitive())
+            setNodeOutputErrors(status = timeoutErrMsg, step = STEP_PREPARE_ENV, error = "${e.message}".asJsonPrimitive(), logging = isLogResponseEnabled)
             log.error("Failed to process on remote executor requestId ($processId)", e)
             addError(timeoutErrMsg)
         }
@@ -195,18 +214,37 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
                 }
                 val logs = JacksonUtils.jsonNodeFromObject(remoteExecutionOutput.response)
                 if (remoteExecutionOutput.status != StatusType.SUCCESS) {
-                    setNodeOutputErrors(remoteExecutionOutput.status.name, logs, remoteExecutionOutput.payload)
+                    setNodeOutputErrors(remoteExecutionOutput.status.name,
+                            STEP_EXEC_CMD,
+                            logs,
+                            remoteExecutionOutput.payload,
+                            isLogResponseEnabled
+                    )
                 } else {
-                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(), logs,
-                        remoteExecutionOutput.payload)
+                    setNodeOutputProperties(remoteExecutionOutput.status.name.asJsonPrimitive(),
+                            STEP_EXEC_CMD,
+                            logs,
+                            remoteExecutionOutput.payload,
+                            isLogResponseEnabled
+                    )
                 }
             } catch (timeoutEx: TimeoutCancellationException) {
                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
-                setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
+                setNodeOutputErrors(status = timeoutErrMsg,
+                        step = STEP_EXEC_CMD,
+                        logs = "".asJsonPrimitive(),
+                        error = "".asJsonPrimitive(),
+                        logging = isLogResponseEnabled
+                )
                 log.error(timeoutErrMsg, timeoutEx)
             } catch (grpcEx: io.grpc.StatusRuntimeException) {
                 val timeoutErrMsg = "Command executor timed out executing after $executionTimeout seconds requestId ($processId)"
-                setNodeOutputErrors(status = timeoutErrMsg, message = "".asJsonPrimitive())
+                setNodeOutputErrors(status = timeoutErrMsg,
+                        step = STEP_EXEC_CMD,
+                        logs = "".asJsonPrimitive(),
+                        error = "".asJsonPrimitive(),
+                        logging = isLogResponseEnabled
+                )
                 log.error("Command executor time out during GRPC call", grpcEx)
             } catch (e: Exception) {
                 log.error("Failed to process on remote executor requestId ($processId)", e)
@@ -234,25 +272,38 @@ open class ComponentRemotePythonExecutor(private val remoteScriptExecutionServic
     /**
      * Utility function to set the output properties of the executor node
      */
-    private fun setNodeOutputProperties(status: JsonNode, message: JsonNode, artifacts: JsonNode) {
+    private fun setNodeOutputProperties(status: JsonNode, step: String, message: JsonNode, artifacts: JsonNode, logging: Boolean = true) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status)
-        log.info("Executor status   : $status")
         setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
-        log.info("Executor artifacts: $artifacts")
         setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
-        log.info("Executor message  : $message")
+
+        if (logging) {
+            log.info("Executor status   : $step : $status")
+            log.info("Executor artifacts: $step : $artifacts")
+            log.info("Executor message  : $step : $message")
+        }
     }
 
     /**
      * Utility function to set the output properties and errors of the executor node, in cas of errors
      */
-    private fun setNodeOutputErrors(status: String, message: JsonNode, artifacts: JsonNode = "".asJsonPrimitive()) {
+    private fun setNodeOutputErrors(
+        status: String,
+        step: String,
+        logs: JsonNode = "N/A".asJsonPrimitive(),
+        error: JsonNode,
+        logging: Boolean = true
+    ) {
         setAttribute(ATTRIBUTE_EXEC_CMD_STATUS, status.asJsonPrimitive())
-        log.info("Executor status   : $status")
-        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, message)
-        log.info("Executor message  : $message")
-        setAttribute(ATTRIBUTE_RESPONSE_DATA, artifacts)
-        log.info("Executor artifacts: $artifacts")
-        addError(status, ATTRIBUTE_EXEC_CMD_LOG, message.toString())
+        setAttribute(ATTRIBUTE_EXEC_CMD_LOG, logs)
+        setAttribute(ATTRIBUTE_RESPONSE_DATA, "N/A".asJsonPrimitive())
+
+        if (logging) {
+            log.info("Executor status   : $step : $status")
+            log.info("Executor message  : $step : $error")
+            log.info("Executor logs     : $step : $logs")
+        }
+
+        addError(status, step, error.toString())
     }
 }
index 5e57b9e..d4edf4b 100644 (file)
@@ -22,6 +22,7 @@ import io.mockk.every
 import io.mockk.mockk
 import kotlinx.coroutines.runBlocking
 import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.PrepareRemoteEnvInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.RemoteScriptExecutionInput
@@ -47,7 +48,10 @@ class ComponentRemotePythonExecutorTest {
         runBlocking {
             val remoteScriptExecutionService = MockRemoteScriptExecutionService()
 
-            val componentRemotePythonExecutor = ComponentRemotePythonExecutor(remoteScriptExecutionService)
+            val componentRemotePythonExecutor = ComponentRemotePythonExecutor(
+                    remoteScriptExecutionService,
+                    mockk<BluePrintPropertiesService>()
+            )
 
             val executionServiceInput =
                 JacksonUtils.readValueFromClassPathFile(
@@ -88,7 +92,10 @@ class ComponentRemotePythonExecutorTest {
     fun testComponentRemotePythonExecutorProcessNB() {
         runBlocking {
             val remoteScriptExecutionService = MockRemoteScriptExecutionService()
-            val componentRemotePythonExecutor = ComponentRemotePythonExecutor(remoteScriptExecutionService)
+            val componentRemotePythonExecutor = ComponentRemotePythonExecutor(
+                    remoteScriptExecutionService,
+                    mockk<BluePrintPropertiesService>()
+            )
             val bluePrintRuntime = mockk<DefaultBluePrintRuntimeService>("123456-1000")
 
             every { bluePrintRuntime.getBluePrintError() } answers { BluePrintError() } // successful case.
index ac35fbf..b07d643 100644 (file)
@@ -78,7 +78,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
     var keystore: String? = null
     var keystorePassword: String? = null
     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
-    var sslEndpointIdentificationAlgorithm: String = ""
+    var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
 
     override fun getConfig(): HashMap<String, Any> {
         val configProps = super.getConfig()
@@ -142,7 +142,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
     var keystore: String? = null
     var keystorePassword: String? = null
     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
-    var sslEndpointIdentificationAlgorithm: String = ""
+    var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
 
     override fun getConfig(): HashMap<String, Any> {
         val configProps = super.getConfig()
@@ -218,7 +218,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
     var keystore: String? = null
     var keystorePassword: String? = null
     var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
-    var sslEndpointIdentificationAlgorithm: String = ""
+    var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
 
     override fun getConfig(): HashMap<String, Any> {
         val configProps = super.getConfig()
index 931f052..e4991d2 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import com.fasterxml.jackson.databind.node.ObjectNode
 import org.apache.commons.lang.builder.ToStringBuilder
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
@@ -39,6 +43,10 @@ class KafkaMessageProducerService(
 
     private val messageLoggerService = MessageLoggerService()
 
+    companion object {
+        const val MAX_ERR_MSG_LEN = 128
+    }
+
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
         return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -54,9 +62,14 @@ class KafkaMessageProducerService(
         message: Any,
         headers: MutableMap<String, String>?
     ): Boolean {
-        val byteArrayMessage = when (message) {
-            is String -> message.toByteArray(Charset.defaultCharset())
-            else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+        var clonedMessage = message
+        if (clonedMessage is ExecutionServiceOutput) {
+            clonedMessage = truncateResponse(clonedMessage)
+        }
+
+        val byteArrayMessage = when (clonedMessage) {
+            is String -> clonedMessage.toByteArray(Charset.defaultCharset())
+            else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
         }
 
         val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
@@ -85,4 +98,37 @@ class KafkaMessageProducerService(
 
         return kafkaProducer!!
     }
+
+    /**
+     * Truncation of BP responses
+     */
+    private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
+        /** Truncation of error messages */
+        var truncErrMsg = executionServiceOutput.status.errorMessage
+        if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
+            truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
+                    " [...]. Check Blueprint Processor logs for more information."
+        }
+        /** Truncation for Command Executor responses */
+        var truncPayload = executionServiceOutput.payload.deepCopy()
+        val workflowName = executionServiceOutput.actionIdentifiers.actionName
+        if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
+            var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
+            cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
+        }
+        return ExecutionServiceOutput().apply {
+            correlationUUID = executionServiceOutput.correlationUUID
+            commonHeader = executionServiceOutput.commonHeader
+            actionIdentifiers = executionServiceOutput.actionIdentifiers
+            status = Status().apply {
+                code = executionServiceOutput.status.code
+                eventType = executionServiceOutput.status.eventType
+                timestamp = executionServiceOutput.status.timestamp
+                errorMessage = truncErrMsg
+                message = executionServiceOutput.status.message
+            }
+            payload = truncPayload
+            stepData = executionServiceOutput.stepData
+        }
+    }
 }
index ac08dc7..fdf6e48 100644 (file)
@@ -214,7 +214,7 @@ open class BlueprintMessageConsumerServiceTest {
                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
-                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
                         "username=\"sample-user\" " +
index 72a47ed..da73949 100644 (file)
@@ -109,7 +109,7 @@ open class BlueprintMessageProducerServiceTest {
                 SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
                 SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
                 SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
-                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+                SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
                 SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
                 SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
                         "username=\"sample-user\" " +
index 49f2a50..a95af81 100644 (file)
@@ -51,13 +51,15 @@ open class BluePrintProcessingKafkaConsumer(
 
     companion object {
         const val CONSUMER_SELECTOR = "self-service-api"
+        const val PRODUCER_SELECTOR = "self-service-api"
     }
 
     @EventListener(ApplicationReadyEvent::class)
     fun setupMessageListener() = runBlocking {
         try {
             log.info(
-                "Setting up message consumer($CONSUMER_SELECTOR)"
+                "Setting up message consumer($CONSUMER_SELECTOR)" +
+                        "message producer($PRODUCER_SELECTOR)..."
             )
 
             /** Get the Message Consumer Service **/
@@ -72,6 +74,18 @@ open class BluePrintProcessingKafkaConsumer(
                 throw BluePrintProcessorException("failed to create consumer service ${e.message}")
             }
 
+            /** Get the Message Producer Service **/
+            val blueprintMessageProducerService = try {
+                bluePrintMessageLibPropertyService
+                        .blueprintMessageProducerService(PRODUCER_SELECTOR)
+            } catch (e: BluePrintProcessorException) {
+                val errorMsg = "Failed creating Kafka producer message service."
+                throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
+                        "Wrong Kafka selector provided or internal error in Kafka service.")
+            } catch (e: Exception) {
+                throw BluePrintProcessorException("failed to create producer service ${e.message}")
+            }
+
             launch {
                 /** Subscribe to the consumer topics */
                 val additionalConfig: MutableMap<String, Any> = hashMapOf()
@@ -82,7 +96,8 @@ open class BluePrintProcessingKafkaConsumer(
                             ph.register()
                             log.trace("Consumed Message : $message")
                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
-                            executionServiceHandler.doProcess(executionServiceInput)
+                            val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+                            blueprintMessageProducerService.sendMessage(executionServiceOutput)
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         } finally {
@@ -93,7 +108,8 @@ open class BluePrintProcessingKafkaConsumer(
             }
         } catch (e: Exception) {
             log.error(
-                "failed to start message consumer($CONSUMER_SELECTOR) ", e
+                "failed to start message consumer($CONSUMER_SELECTOR) " +
+                        "message producer($PRODUCER_SELECTOR) ", e
             )
         }
     }
@@ -102,7 +118,8 @@ open class BluePrintProcessingKafkaConsumer(
     fun shutdownMessageListener() = runBlocking {
         try {
             log.info(
-                "Shutting down message consumer($CONSUMER_SELECTOR)"
+                "Shutting down message consumer($CONSUMER_SELECTOR)" +
+                        "message producer($PRODUCER_SELECTOR)..."
             )
             blueprintMessageConsumerService.shutDown()
             ph.arriveAndAwaitAdvance()
index 129e7a5..1c5d47c 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
@@ -48,12 +49,9 @@ class KafkaPublishAuditService(
     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
     private val blueprintsProcessorCatalogService: BluePrintCatalogService
 ) : PublishAuditService {
-
     private var inputInstance: BlueprintMessageProducerService? = null
     private var outputInstance: BlueprintMessageProducerService? = null
-
     private lateinit var correlationUUID: String
-
     private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
 
     companion object {
@@ -127,7 +125,8 @@ class KafkaPublishAuditService(
             correlationUUID = executionServiceInput.correlationUUID
             commonHeader = executionServiceInput.commonHeader
             actionIdentifiers = executionServiceInput.actionIdentifiers
-            payload = executionServiceInput.payload
+            payload = executionServiceInput.payload.deepCopy()
+            stepData = executionServiceInput.stepData
         }
 
         val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName
@@ -173,8 +172,7 @@ class KafkaPublishAuditService(
 
             sensitiveParameters.forEach { sensitiveParameter ->
                 if (workflowProperties.has(sensitiveParameter)) {
-                    workflowProperties.remove(sensitiveParameter)
-                    workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED)
+                    workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
                 }
             }
         }
index c381260..7a20469 100644 (file)
@@ -3,7 +3,7 @@ FROM python:3.6-slim
 ENV GRPC_PYTHON_VERSION 1.20.0
 RUN python -m pip install --upgrade pip
 RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION}
-RUN pip install virtualenv==16.7.9
+RUN pip install virtualenv==16.7.9 pympler==0.8
 
 RUN groupadd -r onap && useradd -r -g onap onap
 
index 1e6f03b..0c476b2 100644 (file)
@@ -43,43 +43,48 @@ class CommandExecutorHandler():
     def is_installed(self):
         return os.path.exists(self.installed)
 
-    def prepare_env(self, request, results):
+    def prepare_env(self, request):
+        results_log = []
         if not self.is_installed():
             create_venv_status = self.create_venv()
-            if not create_venv_status["cds_is_successful"]:
-                err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status["err_msg"])
+            if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+                err_msg = "ERROR: failed to prepare environment for request {} due to error in creating virtual Python env. Original error {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY])
                 self.logger.error(err_msg)
-                return utils.build_ret_data(False, err_msg)
+                return utils.build_ret_data(False, error=err_msg)
 
             activate_venv_status = self.activate_venv()
-            if not activate_venv_status["cds_is_successful"]:
-                err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status["err_msg"])
+            if not activate_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+                err_msg = "ERROR: failed to prepare environment for request {} due Python venv_activation. Original error {}".format(self.blueprint_id, activate_venv_status[utils.ERR_MSG_KEY])
                 self.logger.error(err_msg)
-                return utils.build_ret_data(False, err_msg)
+                return utils.build_ret_data(False, error=err_msg)
             try:
                 with open(self.installed, "w+") as f:
-                    if not self.install_packages(request, CommandExecutor_pb2.pip, f, results):
-                        return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id))
+                    if not self.install_packages(request, CommandExecutor_pb2.pip, f, results_log):
+                        err_msg = "ERROR: failed to prepare environment for request {} during pip package install.".format(self.blueprint_id)
+                        return utils.build_ret_data(False, results_log=results_log, error=err_msg)
                     f.write("\r\n") # TODO: is \r needed?
-                    results.append("\n")
-                    if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results):
-                        return utils.build_ret_data(False, "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id))
+                    results_log.append("\n")
+                    if not self.install_packages(request, CommandExecutor_pb2.ansible_galaxy, f, results_log):
+                        err_msg = "ERROR: failed to prepare environment for request {} during Ansible install.".format(self.blueprint_id)
+                        return utils.build_ret_data(False, results_log=results_log, error=err_msg)
             except Exception as ex:
                 err_msg = "ERROR: failed to prepare environment for request {} during installing packages. Exception: {}".format(self.blueprint_id, ex)
                 self.logger.error(err_msg)
-                return utils.build_ret_data(False, err_msg)
+                return utils.build_ret_data(False, error=err_msg)
         else:
             try:
                 with open(self.installed, "r") as f:
-                    results.append(f.read())
+                    results_log.append(f.read())
             except Exception as ex:
-                return utils.build_ret_data(False, "ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex))
+                err_msg="ERROR: failed to prepare environment during reading 'installed' file {}. Exception: {}".format(self.installed, ex)
+                return utils.build_ret_data(False, error=err_msg)
 
         # deactivate_venv(blueprint_id)
-        return utils.build_ret_data(True, "")
+        return utils.build_ret_data(True, results_log=results_log)
 
-    def execute_command(self, request, results):
-        payload_result = {}
+    def execute_command(self, request):
+        results_log = []
+        result = {}
         # workaround for when packages are not specified, we may not want to go through the install step
         # can just call create_venv from here.
         if not self.is_installed():
@@ -87,16 +92,14 @@ class CommandExecutorHandler():
         try:
             if not self.is_installed():
                 create_venv_status = self.create_venv
-                if not create_venv_status["cds_is_successful"]:
-                    err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status["err_msg"])
-                    results.append(err_msg)
-                    return utils.build_ret_data(False, err_msg)
+                if not create_venv_status[utils.CDS_IS_SUCCESSFUL_KEY]:
+                    err_msg = "{} - Failed to execute command during venv creation. Original error: {}".format(self.blueprint_id, create_venv_status[utils.ERR_MSG_KEY])
+                    return utils.build_ret_data(False, error=err_msg)
             activate_response = self.activate_venv()
-            if not activate_response["cds_is_successful"]:
-                orig_error = activate_response["err_msg"]
+            if not activate_response[utils.CDS_IS_SUCCESSFUL_KEY]:
+                orig_error = activate_response[utils.ERR_MSG_KEY]
                 err_msg = "{} - Failed to execute command during environment activation. Original error: {}".format(self.blueprint_id, orig_error)
-                results.append(err_msg) #TODO: get rid of results and just rely on the return data struct.
-                return utils.build_ret_data(False, err_msg)
+                return utils.build_ret_data(False, error=err_msg)
 
             cmd = "cd " + self.venv_home
 
@@ -131,26 +134,25 @@ class CommandExecutorHandler():
                         payload = '\n'.join(payload_section)
                         msg = email.parser.Parser().parsestr(payload)
                         for part in msg.get_payload():
-                            payload_result = json.loads(part.get_payload())
+                            result = json.loads(part.get_payload())
                     if output and not is_payload_section:
                         self.logger.info(output.strip())
-                        results.append(output.strip())
+                        results_log.append(output.strip())
                     else:
                         payload_section.append(output.strip())
                 rc = newProcess.poll()
         except Exception as e:
             err_msg = "{} - Failed to execute command. Error: {}".format(self.blueprint_id, e)
-            self.logger.info(err_msg)
-            results.append(e)
-            payload_result.update(utils.build_ret_data(False, err_msg))
-            return payload_result
+            return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
 
         # deactivate_venv(blueprint_id)
         #Since return code is only used to check if it's zero (success), we can just return success flag instead.
         self.logger.debug("python return_code : {}".format(rc))
-        is_execution_successful = rc == 0
-        payload_result.update(utils.build_ret_data(is_execution_successful, ""))
-        return payload_result
+        if rc == 0:
+            return utils.build_ret_data(True, results=result, results_log=results_log)
+        else:
+            err_msg = "{} - Something wrong happened during command execution. See execute command logs for more information.".format(self.blueprint_id)
+            return utils.build_ret_data(False, results=result, results_log=results_log, error=err_msg)
 
     def install_packages(self, request, type, f, results):
         success = self.install_python_packages('UTILITY', results)
index 3435e22..2070976 100644 (file)
@@ -22,9 +22,6 @@ import proto.CommandExecutor_pb2_grpc as CommandExecutor_pb2_grpc
 from command_executor_handler import CommandExecutorHandler
 import utils
 
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
 class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServicer):
 
     def __init__(self):
@@ -35,14 +32,14 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
         self.logger.info("{} - Received prepareEnv request".format(blueprint_id))
         self.logger.info(request)
 
-        results = []
         handler = CommandExecutorHandler(request)
-        prepare_env_response = handler.prepare_env(request, results)
-        if not prepare_env_response["cds_is_successful"]:
-            self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, results))
-            return utils.build_grpc_response(request, results, {}, False)
-        self.logger.info("{} - Package installation logs {}".format(blueprint_id, results))
-        return utils.build_grpc_response(request, results, {}, True)
+        prepare_env_response = handler.prepare_env(request)
+        if prepare_env_response[utils.CDS_IS_SUCCESSFUL_KEY]:
+            self.logger.info("{} - Package installation logs {}".format(blueprint_id, prepare_env_response[utils.RESULTS_LOG_KEY]))
+        else:
+            self.logger.info("{} - Failed to prepare python environment. {}".format(blueprint_id, prepare_env_response[utils.ERR_MSG_KEY]))
+        self.logger.info("Prepare Env Response returned : %s" % prepare_env_response)
+        return utils.build_grpc_response(request.requestId, prepare_env_response)
 
     def executeCommand(self, request, context):
         blueprint_id = utils.get_blueprint_id(request)
@@ -53,13 +50,15 @@ class CommandExecutorServer(CommandExecutor_pb2_grpc.CommandExecutorServiceServi
         log_results = []
         payload_result = {}
         handler = CommandExecutorHandler(request)
-        payload_result = handler.execute_command(request, log_results)
-        if not payload_result["cds_is_successful"]:
-            self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, log_results))
-        else:
+        exec_cmd_response = handler.execute_command(request)
+        if exec_cmd_response[utils.CDS_IS_SUCCESSFUL_KEY]:
             self.logger.info("{} - Execution finished successfully.".format(blueprint_id))
+            self.logger.info("{} - Log Results {}: ".format(blueprint_id, exec_cmd_response[utils.RESULTS_LOG_KEY]))
+            self.logger.info("{} - Results : {}".format(blueprint_id, exec_cmd_response[utils.RESULTS_KEY]))
+        else:
+            self.logger.info("{} - Failed to executeCommand. {}".format(blueprint_id, exec_cmd_response[utils.ERR_MSG_KEY]))
 
-        ret = utils.build_grpc_response(request, log_results, payload_result, payload_result["cds_is_successful"])
-        self.logger.info("Payload returned %s" % payload_result)
+        ret = utils.build_grpc_response(request.requestId, exec_cmd_response)
+        self.logger.info("Response returned : {}".format(exec_cmd_response))
 
         return ret
\ No newline at end of file
index 574be51..b982416 100644 (file)
@@ -17,7 +17,13 @@ from google.protobuf.timestamp_pb2 import Timestamp
 
 import proto.CommandExecutor_pb2 as CommandExecutor_pb2
 import json
+from pympler import asizeof
 
+CDS_IS_SUCCESSFUL_KEY = "cds_is_successful"
+ERR_MSG_KEY = "err_msg"
+RESULTS_KEY = "results"
+RESULTS_LOG_KEY = "results_log"
+TRUNC_MSG_LEN = 3 * 1024 * 1024
 
 def get_blueprint_id(request):
   blueprint_name = request.identifiers.blueprintName
@@ -25,27 +31,47 @@ def get_blueprint_id(request):
   return blueprint_name + '/' + blueprint_version
 
 # Create a response for grpc. Fills in the timestamp as well as removes cds_is_successful element
-def build_grpc_response(request, log_results, payload_return, is_success=False):
-  if is_success:
+def build_grpc_response(request_id, response):
+  if response[CDS_IS_SUCCESSFUL_KEY]:
     status = CommandExecutor_pb2.SUCCESS
+    payload = json.dumps(response[RESULTS_KEY])
   else:
     status = CommandExecutor_pb2.FAILURE
+    # truncate error message if too heavy
+    if asizeof.asizeof(response[ERR_MSG_KEY]) > TRUNC_MSG_LEN:
+      response[ERR_MSG_KEY] = "{} [...]. Check command executor logs for more information.".format(response[ERR_MSG_KEY][:TRUNC_MSG_LEN])
+    payload = json.dumps(response[ERR_MSG_KEY])
+
+  # truncate cmd-exec logs if too heavy
+  response[RESULTS_LOG_KEY] = truncate_cmd_exec_logs(response[RESULTS_LOG_KEY])
 
   timestamp = Timestamp()
   timestamp.GetCurrentTime()
 
-  if "cds_is_successful" in payload_return:
-    payload_return.pop('cds_is_successful')
-  payload_str = json.dumps(payload_return)
-  return CommandExecutor_pb2.ExecutionOutput(requestId=request.requestId,
-                                             response=log_results,
+  return CommandExecutor_pb2.ExecutionOutput(requestId=request_id,
+                                             response=response[RESULTS_LOG_KEY],
                                              status=status,
-                                             payload=payload_str,
+                                             payload=payload,
                                              timestamp=timestamp)
 
-# build a return data structure which may contain an error message
-def build_ret_data(cds_is_successful, err_msg):
-  ret_data = {"cds_is_successful": cds_is_successful }
-  if err_msg != "":
-    ret_data["err_msg"] = err_msg
+# build a ret data structure
+def build_ret_data(cds_is_successful, results={}, results_log=[], error=None):
+  ret_data = {
+            CDS_IS_SUCCESSFUL_KEY: cds_is_successful,
+            RESULTS_KEY: results,
+            RESULTS_LOG_KEY: results_log
+         }
+  if error:
+    ret_data[ERR_MSG_KEY] = error
   return ret_data
+
+def truncate_cmd_exec_logs(logs):
+    truncated_logs = []
+    truncated_logs_memsize = 0
+    for log in logs:
+        truncated_logs_memsize += asizeof.asizeof(log)
+        if truncated_logs_memsize > TRUNC_MSG_LEN:
+            truncated_logs.append("Execution logs exceeds the maximum size allowed. Check command executor logs to view the execute-command-logs.")
+            break
+        truncated_logs.append(log)
+    return truncated_logs
\ No newline at end of file