String topicNameStd = null;
// topicNameStd=
- // ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
+
topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
"enforced.topic.name.AAF");
String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
&& !topic.equalsIgnoreCase(metricTopicname))) {
// the topic name will be sent by the client
- // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
+
DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
String permission = aaf.aafPermissionString(topic, "pub");
if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
if (null != batchlen)
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
- // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+
final LinkedList<Publisher.message> batch = new LinkedList<>();
// final ArrayList<KeyedMessage<String, String>> kms = new
- // ArrayList<KeyedMessage<String, String>>();
+
final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
try {
// for each message...
batch.add(m);
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
- // m.getMessage());
+
// kms.add(data);
final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
m.getMessage());
final int sizeNow = batch.size();
if (sizeNow > maxEventBatch) {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
// kms.clear();
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
pms.clear();
final int sizeNow = batch.size();
if (sizeNow > 0) {
// ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
+
// kms.clear();
ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
pms.clear();
if (null != evenlen)
maxEventBatch = Long.parseLong(evenlen);
// final long maxEventBatch =
- // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+
final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
// final ArrayList<KeyedMessage<String, String>> kms = new
- // ArrayList<KeyedMessage<String, String>>();
+
final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
Publisher.message m = null;
int messageSequence = 1;
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
// LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
+
try {
// for each message...
batchId = DMaaPContext.getBatchID();
while ((m = events.next()) != null) {
// LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
+
addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
transactionEnabled);
messageSequence++;
- // add the message to the batch
+
batch.add(m);
responseTransactionId = m.getLogDetails().getTransactionId();
jsonObject.put("transactionId", responseTransactionId);
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
- // jsonObject.toString());
+
// kms.add(data);
final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
m.getMessage());