- // if (topic.isTransactionEnabled() || true) {
- if (istransEnable) {
- // As part of DMaaP changes we are wrapping the original
- // message into a json object
- // and then this json object is further wrapped into message
- // object before publishing,
- // so extracting the original message from the message
- // object for matching with filter.
- final JSONObject jsonMessage = new JSONObject(msg.getMessage());
- message = jsonMessage.getString("message");
- } else {
- message = msg.getMessage();
+ String transactionid = "";
+ try {
+ // String msgRecord = msg;
+ JSONObject jsonMessage = new JSONObject(msgRecord);
+ String[] keys = JSONObject.getNames(jsonMessage);
+ boolean wrapheader1 = false;
+ boolean wrapheader2 = false;
+ boolean found_attr3 = false;
+ String wrapElement1 = "message";
+ String wrapElement2 = "msgWrapMR";
+ String transIdElement = "transactionId";
+ if (null != keys) {
+ for (String key : keys) {
+ if (key.equals(wrapElement1)) {
+ wrapheader1 = true;
+ } else if (key.equals(wrapElement2)) {
+ wrapheader2 = true;
+ } else if (key.equals(transIdElement)) {
+ found_attr3 = true;
+ transactionid = jsonMessage.getString(key);
+ }
+ }
+ }
+
+ // returns contents of attribute 1 if both attributes
+ // present, otherwise
+ // the whole msg
+ if (wrapheader2 && found_attr3) {
+ message = jsonMessage.getString(wrapElement2);
+ } else if (wrapheader1 && found_attr3) {
+ message = jsonMessage.getString(wrapElement1);
+ } else {
+ message = msgRecord.getMessage();
+ }
+ // jsonMessage = extractMessage(jsonMessage ,
+ // "message","msgWrapMR","transactionId");
+ istransType = true;
+ } catch (JSONException e) { // This check is required for the
+ // message sent by MR AAF flow but
+ // consumed by UEB ACL flow which
+ // wont expect transaction id in
+ // cambria client api
+ // Ignore
+ log.info("JSON Exception logged when the message is non JSON Format");
+ } catch (Exception exp) {
+ log.info("****Some Exception occured for writing messages in topic" + topic.getName()
+ + " Exception" + exp);
+ }
+ if (message == null || message.equals("")) {
+ istransType = false;
+ message = msgRecord.getMessage();