Modified CDS Kafka consumersand producers logs to provide more details about the topic of the consumer record being consumed or published.
Refactored the publish callback to make it more readable.
Refactored audit service log error messages.
Issue-ID: CCSDK-3154
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: I7b42930e926bc15ce175974a78d3bfe2f219b0a8
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/** execute the command block */
if (!channel.isClosedForSend) {
channel.send(consumerRecord)
+ log.info(
+ "Channel sent Consumer Record : topic(${consumerRecord.topic()}) " +
+ "partition(${consumerRecord.partition()}) " +
+ "leaderEpoch(${consumerRecord.leaderEpoch().get()}) " +
+ "offset(${consumerRecord.offset()}) " +
+ "key(${consumerRecord.key()})"
+ )
} else {
log.error("Channel is closed to receive message")
}
/*
* Copyright © 2019 IBM.
- * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
val callback = Callback { metadata, exception ->
if (exception != null)
- log.error("ERROR : ${exception.message}")
+ log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
else {
- var logMessage = when (clonedMessage) {
- is ExecutionServiceInput ->
- "Request published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- is ExecutionServiceOutput ->
- "Response published to ${metadata.topic()} for CBA: ${clonedMessage.actionIdentifiers.blueprintName} version: ${clonedMessage.actionIdentifiers.blueprintVersion}"
- else -> "Message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers"
- }
- log.info(logMessage)
+ val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
+ "partition(${metadata.partition()}) " +
+ "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
+ log.info(message)
}
}
messageTemplate().send(record, callback)
/** Truncation of error messages */
var truncErrMsg = executionServiceOutput.status.errorMessage
if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
- truncErrMsg = "${truncErrMsg.substring(0, MAX_ERR_MSG_LEN)}" +
+ truncErrMsg = truncErrMsg.substring(0, MAX_ERR_MSG_LEN) +
" [...]. Check Blueprint Processor logs for more information."
}
/** Truncation for Command Executor responses */
stepData = executionServiceOutput.stepData
}
}
+
+ private fun getMessageLogData(message: Any): String {
+ return when (message) {
+ is ExecutionServiceInput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ is ExecutionServiceOutput -> {
+ val actionIdentifiers = message.actionIdentifiers
+ "CBA(${actionIdentifiers.blueprintName}/${actionIdentifiers.blueprintVersion}/${actionIdentifiers.actionName})"
+ }
+ else -> "message($message)"
+ }
+ }
}
/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2021 Bell Canada.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
ph.register()
val key = message.key() ?: UUID.randomUUID().toString()
val value = String(message.value(), Charset.defaultCharset())
- log.trace("Consumed Message : key($key) value($value)")
val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
+ log.info(
+ "Consumed Message : topic(${message.topic()}) " +
+ "partition(${message.partition()}) " +
+ "leaderEpoch(${message.leaderEpoch().get()}) " +
+ "offset(${message.offset()}) " +
+ "key(${message.key()}) " +
+ "CBA(${executionServiceInput.actionIdentifiers.blueprintName}/${executionServiceInput.actionIdentifiers.blueprintVersion}/${executionServiceInput.actionIdentifiers.actionName})"
+ )
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
/*
- * Copyright © 2020 Bell Canada
+ * Copyright © 2021 Bell Canada
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : ${e.message}"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution request to Kafka.", ex)
}
}
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
this.outputInstance!!.sendMessage(key, executionServiceOutput)
- } catch (e: Exception) {
- var errMsg =
- if (e.message != null) "ERROR : $e"
- else "ERROR : Failed to send execution request to Kafka."
- log.error(errMsg)
+ } catch (ex: Exception) {
+ log.error("Failed to publish execution response to Kafka.", ex)
}
}
}
}
}
- } catch (e: Exception) {
- val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
- log.error(errMsg, e)
+ } catch (ex: Exception) {
+ val errMsg = "Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, ex)
clonedExecutionServiceInput.payload.replace(
"$workflowName-request",
- "$errMsg $e".asJsonPrimitive()
+ "$errMsg $ex".asJsonPrimitive()
)
}
return clonedExecutionServiceInput