SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
// LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
+
try {
// for each message...
batchId = DMaaPContext.getBatchID();
while ((m = events.next()) != null) {
// LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
+
addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
transactionEnabled);
messageSequence++;
- // add the message to the batch
+
batch.add(m);
responseTransactionId = m.getLogDetails().getTransactionId();
jsonObject.put("transactionId", responseTransactionId);
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
- // jsonObject.toString());
+
// kms.add(data);
final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
m.getMessage());