Truncate message published on Kafka / Spike: Define solution for logs separation
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / KafkaMessageProducerService.kt
index 931f052..e4991d2 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import com.fasterxml.jackson.databind.node.ObjectNode
 import org.apache.commons.lang.builder.ToStringBuilder
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
@@ -39,6 +43,10 @@ class KafkaMessageProducerService(
 
     private val messageLoggerService = MessageLoggerService()
 
+    companion object {
+        const val MAX_ERR_MSG_LEN = 128
+    }
+
     override suspend fun sendMessageNB(message: Any): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
         return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -54,9 +62,14 @@ class KafkaMessageProducerService(
         message: Any,
         headers: MutableMap<String, String>?
     ): Boolean {
-        val byteArrayMessage = when (message) {
-            is String -> message.toByteArray(Charset.defaultCharset())
-            else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+        var clonedMessage = message
+        if (clonedMessage is ExecutionServiceOutput) {
+            clonedMessage = truncateResponse(clonedMessage)
+        }
+
+        val byteArrayMessage = when (clonedMessage) {
+            is String -> clonedMessage.toByteArray(Charset.defaultCharset())
+            else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
         }
 
         val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
@@ -85,4 +98,37 @@ class KafkaMessageProducerService(
 
         return kafkaProducer!!
     }
+
+    /**
+     * Truncation of BP responses
+     */
+    private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
+        /** Truncation of error messages */
+        var truncErrMsg = executionServiceOutput.status.errorMessage
+        if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
+            truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
+                    " [...]. Check Blueprint Processor logs for more information."
+        }
+        /** Truncation for Command Executor responses */
+        var truncPayload = executionServiceOutput.payload.deepCopy()
+        val workflowName = executionServiceOutput.actionIdentifiers.actionName
+        if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
+            var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
+            cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
+        }
+        return ExecutionServiceOutput().apply {
+            correlationUUID = executionServiceOutput.correlationUUID
+            commonHeader = executionServiceOutput.commonHeader
+            actionIdentifiers = executionServiceOutput.actionIdentifiers
+            status = Status().apply {
+                code = executionServiceOutput.status.code
+                eventType = executionServiceOutput.status.eventType
+                timestamp = executionServiceOutput.status.timestamp
+                errorMessage = truncErrMsg
+                message = executionServiceOutput.status.message
+            }
+            payload = truncPayload
+            stepData = executionServiceOutput.stepData
+        }
+    }
 }