bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / service / impl / EventsServiceImpl.java
index 4ca6446..73a373e 100644 (file)
@@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor;
 @Service
 public class EventsServiceImpl implements EventsService {
        // private static final Logger LOG =
-       // Logger.getLogger(EventsServiceImpl.class);
+       
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
 
        private static final String BATCH_LENGTH = "event.batch.length";
@@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService {
        private DMaaPErrorMessages errorMessages;
        
        //@Autowired
-       //KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+       
 
        // @Value("${metrics.send.cambria.topic}")
-       // private String metricsTopic;
+       
 
        public DMaaPErrorMessages getErrorMessages() {
                return errorMessages;
@@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService {
                        CambriaApiException, IOException, DMaaPAccessDeniedException {
                final long startTime = System.currentTimeMillis();
                final HttpServletRequest req = ctx.getRequest();
-       //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider);
+       
                boolean isAAFTopic = false;
                // was this host blacklisted?
                final String remoteAddr = Utils.getRemoteAddress(ctx);
@@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService {
                if (strtimeoutMS != null)
                        timeoutMs = Integer.parseInt(strtimeoutMS);
                // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-               // CambriaConstants.kNoTimeout);
+               
                if (req.getParameter("timeout") != null) {
                        timeoutMs = Integer.parseInt(req.getParameter("timeout"));
                }
@@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService {
                // if headers are not provided then user will be null
                if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
                        // the topic name will be sent by the client
-                       // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+                       
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
                        String permission = aaf.aafPermissionString(topic, "sub");
                        if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService {
                logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
                                + " " + clientId);
                Consumer c = null;
-               // String localclientId = clientId;
+               
                String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "clusterhostid");
                if (null == lhostId) {
@@ -376,7 +376,7 @@ public class EventsServiceImpl implements EventsService {
                String topicNameStd = null;
 
                // topicNameStd=
-               // ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
+               
                topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
                                "enforced.topic.name.AAF");
                String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -404,7 +404,7 @@ public class EventsServiceImpl implements EventsService {
                if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
                                && !topic.equalsIgnoreCase(metricTopicname))) {
                        // the topic name will be sent by the client
-                       // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
+               
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
                        String permission = aaf.aafPermissionString(topic, "pub");
                        if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
@@ -481,16 +481,16 @@ public class EventsServiceImpl implements EventsService {
                // start processing, building a batch to push to the backend
                final long startMs = System.currentTimeMillis();
                long count = 0;
-               long maxEventBatch = 1024 * 16;
+               long maxEventBatch = 1024L* 16;
                String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
                if (null != batchlen)
                        maxEventBatch = Long.parseLong(batchlen);
                // long maxEventBatch =
-               // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
-               final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+               
+               final LinkedList<Publisher.message> batch = new LinkedList<>();
                // final ArrayList<KeyedMessage<String, String>> kms = new
-               // ArrayList<KeyedMessage<String, String>>();
-               final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
+       
+               final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
                try {
                        // for each message...
                        Publisher.message m = null;
@@ -499,7 +499,7 @@ public class EventsServiceImpl implements EventsService {
                                batch.add(m);
                                // final KeyedMessage<String, String> data = new
                                // KeyedMessage<String, String>(topic, m.getKey(),
-                               // m.getMessage());
+                       
                                // kms.add(data);
                                final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
                                                m.getMessage());
@@ -509,7 +509,7 @@ public class EventsServiceImpl implements EventsService {
                                final int sizeNow = batch.size();
                                if (sizeNow > maxEventBatch) {
                                        // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                                       // kms);
+                               
                                        // kms.clear();
                                        ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
                                        pms.clear();
@@ -523,7 +523,7 @@ public class EventsServiceImpl implements EventsService {
                        final int sizeNow = batch.size();
                        if (sizeNow > 0) {
                                // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                               // kms);
+                       
                                // kms.clear();
                                ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
                                pms.clear();
@@ -592,15 +592,15 @@ public class EventsServiceImpl implements EventsService {
                // start processing, building a batch to push to the backend
                final long startMs = System.currentTimeMillis();
                long count = 0;
-               long maxEventBatch = 1024 * 16;
+               long maxEventBatch = 1024L * 16;
                String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
                if (null != evenlen)
                        maxEventBatch = Long.parseLong(evenlen);
                // final long maxEventBatch =
-               // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+               
                final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
                // final ArrayList<KeyedMessage<String, String>> kms = new
-               // ArrayList<KeyedMessage<String, String>>();
+               
                final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
                Publisher.message m = null;
                int messageSequence = 1;
@@ -610,7 +610,7 @@ public class EventsServiceImpl implements EventsService {
                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
 
                // LOG.warn("Batch Start Id: " +
-               // Utils.getFromattedBatchSequenceId(batchId));
+       
                try {
                        // for each message...
                        batchId = DMaaPContext.getBatchID();
@@ -620,13 +620,13 @@ public class EventsServiceImpl implements EventsService {
                        while ((m = events.next()) != null) {
 
                                // LOG.warn("Batch Start Id: " +
-                               // Utils.getFromattedBatchSequenceId(batchId));
+                               
 
                                addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
                                                transactionEnabled);
                                messageSequence++;
 
-                               // add the message to the batch
+                       
                                batch.add(m);
 
                                responseTransactionId = m.getLogDetails().getTransactionId();
@@ -636,7 +636,7 @@ public class EventsServiceImpl implements EventsService {
                                jsonObject.put("transactionId", responseTransactionId);
                                // final KeyedMessage<String, String> data = new
                                // KeyedMessage<String, String>(topic, m.getKey(),
-                               // jsonObject.toString());
+                       
                                // kms.add(data);
                                final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
                                                m.getMessage());