Merge changes Ie4716415,I6ce7ca81
authorsunil unnava <su622b@att.com>
Mon, 11 Feb 2019 14:24:03 +0000 (14:24 +0000)
committerGerrit Code Review <gerrit@onap.org>
Mon, 11 Feb 2019 14:24:03 +0000 (14:24 +0000)
* changes:
  Sonar majior issues
  Sonar majior issues

INFO.yaml [new file with mode: 0644]
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java

diff --git a/INFO.yaml b/INFO.yaml
new file mode 100644 (file)
index 0000000..6a109bd
--- /dev/null
+++ b/INFO.yaml
@@ -0,0 +1,42 @@
+---
+project: 'dmaap-messagerouter-msgrtr'
+project_creation_date: '2017-08-29'
+lifecycle_state: 'Incubation'
+project_lead: &onap_releng_ptl
+    name: 'Ram Koya'
+    email: 'rk541m@att.com'
+    id: 'rampi_k'
+    company: 'ATT'
+    timezone: 'America/Dallas'
+primary_contact: *onap_releng_ptl
+issue_tracking:
+    type: 'jira'
+    url: 'https://jira.onap.org/projects/DMAAP'
+    key: 'DMAAP'
+meetings:
+    - type: 'zoom'
+        agenda: ''
+        url: 'https://wiki.onap.org/pages/viewpage.action?pageId=13599275'
+        server: 'n/a'
+        channel: 'n/a'
+        repeats: 'weekly'
+        time: '13:00 UTC'
+committers:
+    - <<: *onap_releng_ptl
+    - name: 'Ramdas Sawant'
+        email: 'rs873m@att.com'
+        company: 'ATT'
+        id: 'rs873m'
+        timezone: 'America/Dallas'
+    - name: 'Varun Gudisena'
+        email: 'vg411h@att.com'
+        company: 'ATT'
+        id: 'vg411h'
+        timezone: 'America/Dallas'
+tsc:
+    approval: 'https://lists.onap.org/pipermail/onap-tsc'
+    changes:
+        - type: 'Removal'
+          name: 'Habib Madani'
+          name: 'Xinhui Li'
+          name: 'Jing Wang'
index 93374fb..b3f3c8f 100644 (file)
@@ -77,7 +77,7 @@ public class Kafka011Consumer implements Consumer {
                fId = id;
                fCreateTimeMs = System.currentTimeMillis();
                fLastTouch = fCreateTimeMs;
-               fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+               fPendingMsgs = new LinkedBlockingQueue<>();
                fLogTag = fGroup + "(" + fId + ")/" + fTopic;
                offset = 0;
                state = Kafka011Consumer.State.OPENED;
@@ -113,12 +113,13 @@ public class Kafka011Consumer implements Consumer {
        public synchronized Consumer.Message nextMessage() {
 
                try {
-                       if (fPendingMsgs.size() > 0) {
+                       if (fPendingMsgs.isEmpty()) {
                                return makeMessage(fPendingMsgs.take());
                        }
                } catch (InterruptedException x) {
                        log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
                                        x);
+                       Thread.currentThread().interrupt();
                }
 
                Callable<Boolean> run = new Callable<Boolean>() {
@@ -135,11 +136,10 @@ public class Kafka011Consumer implements Consumer {
                                        }
 
                                } catch (KafkaException x) {
-                                       log.debug(fLogTag + ": KafkaException " + x.getMessage());
+                                       log.debug(fLogTag + ": KafkaException ", x);
 
                                } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
-                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
-                                                       + x.getMessage());
+                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
 
                                }
 
@@ -156,25 +156,26 @@ public class Kafka011Consumer implements Consumer {
                        future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
                        // second
                } catch (TimeoutException ex) {
+               log.error("TimeoutException in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        String apiNodeId = null;
                        try {
                                apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
                        } catch (UnknownHostException e1) {
-                               // TODO Auto-generated catch block
-                               log.error("unable to get the localhost address");
+                               log.error("unable to get the localhost address ", e1);
                        }
 
                        try {
                                if (fKafkaLiveLockAvoider != null)
                                        fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
                        } catch (Exception e) {
-                               log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+                               log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
                        }
 
                        forcePollOnConsumer();
                        future.cancel(true);
                } catch (Exception ex) {
+            log.error("Exception in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        future.cancel(true);
                }
@@ -307,13 +308,12 @@ public class Kafka011Consumer implements Consumer {
                        // second
                } catch (TimeoutException ex) {
                        // timed out. Try to stop the code if possible.
-                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                } catch (Exception ex) {
                        // timed out. Try to stop the code if possible.
-                       log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
-                                       + ex);
+                       log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                        return false;
index a93ac33..9f8f26d 100644 (file)
@@ -72,7 +72,7 @@ public class Kafka011ConsumerUtil {
                                        }
 
                                } catch (Exception e) {
-                                       log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+                                       log.error("Failed and go to Exception block for " + fGroup +" ", e);
                                }
                        }
                });
@@ -108,9 +108,9 @@ public class Kafka011ConsumerUtil {
                                        }
 
                                } catch (java.util.ConcurrentModificationException e) {
-                                       log.error("Error occurs for " + e);
+                                       log.error("Error occurs for ", e);
                                } catch (Exception e) {
-                                       log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+                                       log.error("Failed and go to Exception block for " + group + " ", e);
                                }
                        }
                });
index a38d77b..04d1d9e 100644 (file)
@@ -320,9 +320,9 @@ public class KafkaConsumerCache {
                                curatorConsumerCache.close();
                                log.info("Curator client closed");
                        } catch (ZkInterruptedException e) {
-                               log.warn("Curator client close interrupted: " + e.getMessage());
+                               log.warn("Curator client close interrupted: ", e);
                        } catch (IOException e) {
-                               log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+                               log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
                        }
 
                        curatorConsumerCache = null;
@@ -497,10 +497,10 @@ public class KafkaConsumerCache {
                        log.info(" ^ deleted " + fBaseZkPath + "/" + key);
                } catch (NoNodeException e) {
                        log.warn("A consumer was deleted from " + fApiId
-                                       + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+                                       + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
                } catch (Exception e) {
-                       log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
-                       log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+                       log.debug("Unexpected exception while deleting consumer: ", e);
+                       log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
                }
 
                try {
@@ -648,6 +648,7 @@ public class KafkaConsumerCache {
                        try {
                                curator.setData().forPath(consumerPath, fApiId.getBytes());
                        } catch (KeeperException.NoNodeException e) {
+                           log.info("KeeperException.NoNodeException occured", e);
                                curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
                        }
                        log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
index 387b667..2f436ec 100644 (file)
@@ -637,7 +637,7 @@ public class EventsServiceImpl implements EventsService {
                                // final KeyedMessage<String, String> data = new
                                // KeyedMessage<String, String>(topic, m.getKey(),
                        
-                               // kms.add(data);
+                               
                                final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
                                                m.getMessage());
 
@@ -650,7 +650,7 @@ public class EventsServiceImpl implements EventsService {
                                                        + batchId + "]");
                                        try {
                                                // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                                               // kms);
+                                       
                                                ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
                                                // transactionLogs(batch);
                                                for (message msg : batch) {
@@ -681,7 +681,7 @@ public class EventsServiceImpl implements EventsService {
                                        metricsSet.publishTick(sizeNow);
                                        publishBatchCount = sizeNow;
                                        count += sizeNow;
-                                       // batchId++;
+                                       
                                        String endTime = sdf.format(new Date());
                                        LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
                                                        + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
@@ -698,9 +698,9 @@ public class EventsServiceImpl implements EventsService {
                                                + batchId + "]");
                                try {
                                        // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-                                       // kms);
+                                       
                                        ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
-                                       // transactionLogs(batch);
+                                       
                                        for (message msg : batch) {
                                                LogDetails logDetails = msg.getLogDetails();
                                                LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
@@ -727,7 +727,7 @@ public class EventsServiceImpl implements EventsService {
                                pms.clear();
                                metricsSet.publishTick(sizeNow);
                                count += sizeNow;
-                               // batchId++;
+                       
                                String endTime = sdf.format(new Date());
                                publishBatchCount = sizeNow;
                                LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
@@ -855,12 +855,12 @@ public class EventsServiceImpl implements EventsService {
                return logDetails;
        }
 
-       /*
-        * public String getMetricsTopic() { return metricsTopic; }
-        * 
-        * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =
-        * metricsTopic; }
-        */
+       
+        
+       
+        
+       
+