Merge "Update INFO.yaml file"
authorRam Koya <rk541m@att.com>
Thu, 17 Jan 2019 21:36:52 +0000 (21:36 +0000)
committerGerrit Code Review <gerrit@onap.org>
Thu, 17 Jan 2019 21:36:52 +0000 (21:36 +0000)
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java

index 93374fb..b66a251 100644 (file)
@@ -77,7 +77,7 @@ public class Kafka011Consumer implements Consumer {
                fId = id;
                fCreateTimeMs = System.currentTimeMillis();
                fLastTouch = fCreateTimeMs;
-               fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+               fPendingMsgs = new LinkedBlockingQueue<>();
                fLogTag = fGroup + "(" + fId + ")/" + fTopic;
                offset = 0;
                state = Kafka011Consumer.State.OPENED;
@@ -113,7 +113,7 @@ public class Kafka011Consumer implements Consumer {
        public synchronized Consumer.Message nextMessage() {
 
                try {
-                       if (fPendingMsgs.size() > 0) {
+                       if (fPendingMsgs.isEmpty()) {
                                return makeMessage(fPendingMsgs.take());
                        }
                } catch (InterruptedException x) {
index 387b667..2f436ec 100644 (file)
@@ -637,7 +637,7 @@ public class EventsServiceImpl implements EventsService {
                                // final KeyedMessage<String, String> data = new
                                // KeyedMessage<String, String>(topic, m.getKey(),
                        
-                               // kms.add(data);
+                               
                                final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
                                                m.getMessage());
 
@@ -650,7 +650,7 @@ public class EventsServiceImpl implements EventsService {
                                                        + batchId + "]");
                                        try {
                                                // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                                               // kms);
+                                       
                                                ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
                                                // transactionLogs(batch);
                                                for (message msg : batch) {
@@ -681,7 +681,7 @@ public class EventsServiceImpl implements EventsService {
                                        metricsSet.publishTick(sizeNow);
                                        publishBatchCount = sizeNow;
                                        count += sizeNow;
-                                       // batchId++;
+                                       
                                        String endTime = sdf.format(new Date());
                                        LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
                                                        + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
@@ -698,9 +698,9 @@ public class EventsServiceImpl implements EventsService {
                                                + batchId + "]");
                                try {
                                        // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                                       // kms);
+                                       
                                        ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
-                                       // transactionLogs(batch);
+                                       
                                        for (message msg : batch) {
                                                LogDetails logDetails = msg.getLogDetails();
                                                LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
@@ -727,7 +727,7 @@ public class EventsServiceImpl implements EventsService {
                                pms.clear();
                                metricsSet.publishTick(sizeNow);
                                count += sizeNow;
-                               // batchId++;
+                       
                                String endTime = sdf.format(new Date());
                                publishBatchCount = sizeNow;
                                LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
@@ -855,12 +855,12 @@ public class EventsServiceImpl implements EventsService {
                return logDetails;
        }
 
-       /*
-        * public String getMetricsTopic() { return metricsTopic; }
-        * 
-        * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =
-        * metricsTopic; }
-        */
+       
+        
+       
+        
+       
+