package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
private val messageLoggerService = MessageLoggerService()
+ companion object {
+ const val MAX_ERR_MSG_LEN = 128
+ }
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
message: Any,
headers: MutableMap<String, String>?
): Boolean {
- val byteArrayMessage = when (message) {
- is String -> message.toByteArray(Charset.defaultCharset())
- else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+ var clonedMessage = message
+ if (clonedMessage is ExecutionServiceOutput) {
+ clonedMessage = truncateResponse(clonedMessage)
+ }
+
+ val byteArrayMessage = when (clonedMessage) {
+ is String -> clonedMessage.toByteArray(Charset.defaultCharset())
+ else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
return kafkaProducer!!
}
+
+ /**
+ * Truncation of BP responses
+ */
+ private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
+ /** Truncation of error messages */
+ var truncErrMsg = executionServiceOutput.status.errorMessage
+ if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
+ truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
+ " [...]. Check Blueprint Processor logs for more information."
+ }
+ /** Truncation for Command Executor responses */
+ var truncPayload = executionServiceOutput.payload.deepCopy()
+ val workflowName = executionServiceOutput.actionIdentifiers.actionName
+ if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
+ var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
+ cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
+ }
+ return ExecutionServiceOutput().apply {
+ correlationUUID = executionServiceOutput.correlationUUID
+ commonHeader = executionServiceOutput.commonHeader
+ actionIdentifiers = executionServiceOutput.actionIdentifiers
+ status = Status().apply {
+ code = executionServiceOutput.status.code
+ eventType = executionServiceOutput.status.eventType
+ timestamp = executionServiceOutput.status.timestamp
+ errorMessage = truncErrMsg
+ message = executionServiceOutput.status.message
+ }
+ payload = truncPayload
+ stepData = executionServiceOutput.stepData
+ }
+ }
}