Merge "Blocker sonar fixes - DMaaPKafkaMetaBroker"
authorsunil unnava <su622b@att.com>
Mon, 11 Feb 2019 14:28:03 +0000 (14:28 +0000)
committerGerrit Code Review <gerrit@onap.org>
Mon, 11 Feb 2019 14:28:03 +0000 (14:28 +0000)
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java
src/main/java/org/onap/dmaap/dmf/mr/utils/ConfigurationReader.java

index b66a251..b3f3c8f 100644 (file)
@@ -119,6 +119,7 @@ public class Kafka011Consumer implements Consumer {
                } catch (InterruptedException x) {
                        log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
                                        x);
+                       Thread.currentThread().interrupt();
                }
 
                Callable<Boolean> run = new Callable<Boolean>() {
@@ -135,11 +136,10 @@ public class Kafka011Consumer implements Consumer {
                                        }
 
                                } catch (KafkaException x) {
-                                       log.debug(fLogTag + ": KafkaException " + x.getMessage());
+                                       log.debug(fLogTag + ": KafkaException ", x);
 
                                } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
-                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
-                                                       + x.getMessage());
+                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
 
                                }
 
@@ -156,25 +156,26 @@ public class Kafka011Consumer implements Consumer {
                        future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
                        // second
                } catch (TimeoutException ex) {
+               log.error("TimeoutException in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        String apiNodeId = null;
                        try {
                                apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
                        } catch (UnknownHostException e1) {
-                               // TODO Auto-generated catch block
-                               log.error("unable to get the localhost address");
+                               log.error("unable to get the localhost address ", e1);
                        }
 
                        try {
                                if (fKafkaLiveLockAvoider != null)
                                        fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
                        } catch (Exception e) {
-                               log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+                               log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
                        }
 
                        forcePollOnConsumer();
                        future.cancel(true);
                } catch (Exception ex) {
+            log.error("Exception in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        future.cancel(true);
                }
@@ -307,13 +308,12 @@ public class Kafka011Consumer implements Consumer {
                        // second
                } catch (TimeoutException ex) {
                        // timed out. Try to stop the code if possible.
-                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                } catch (Exception ex) {
                        // timed out. Try to stop the code if possible.
-                       log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
-                                       + ex);
+                       log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                        return false;
index a93ac33..9f8f26d 100644 (file)
@@ -72,7 +72,7 @@ public class Kafka011ConsumerUtil {
                                        }
 
                                } catch (Exception e) {
-                                       log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
+                                       log.error("Failed and go to Exception block for " + fGroup +" ", e);
                                }
                        }
                });
@@ -108,9 +108,9 @@ public class Kafka011ConsumerUtil {
                                        }
 
                                } catch (java.util.ConcurrentModificationException e) {
-                                       log.error("Error occurs for " + e);
+                                       log.error("Error occurs for ", e);
                                } catch (Exception e) {
-                                       log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
+                                       log.error("Failed and go to Exception block for " + group + " ", e);
                                }
                        }
                });
index a38d77b..04d1d9e 100644 (file)
@@ -320,9 +320,9 @@ public class KafkaConsumerCache {
                                curatorConsumerCache.close();
                                log.info("Curator client closed");
                        } catch (ZkInterruptedException e) {
-                               log.warn("Curator client close interrupted: " + e.getMessage());
+                               log.warn("Curator client close interrupted: ", e);
                        } catch (IOException e) {
-                               log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage());
+                               log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e);
                        }
 
                        curatorConsumerCache = null;
@@ -497,10 +497,10 @@ public class KafkaConsumerCache {
                        log.info(" ^ deleted " + fBaseZkPath + "/" + key);
                } catch (NoNodeException e) {
                        log.warn("A consumer was deleted from " + fApiId
-                                       + "'s cache, but no Cambria API node had ownership of it in ZooKeeper");
+                                       + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e);
                } catch (Exception e) {
-                       log.debug("Unexpected exception while deleting consumer: " + e.getMessage());
-                       log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage());
+                       log.debug("Unexpected exception while deleting consumer: ", e);
+                       log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e);
                }
 
                try {
@@ -648,6 +648,7 @@ public class KafkaConsumerCache {
                        try {
                                curator.setData().forPath(consumerPath, fApiId.getBytes());
                        } catch (KeeperException.NoNodeException e) {
+                           log.info("KeeperException.NoNodeException occured", e);
                                curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes());
                        }
                        log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey);
index 19e8e65..f71bda0 100644 (file)
@@ -46,15 +46,15 @@ import org.onap.dmaap.dmf.mr.utils.Utils;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 
-/*import com.att.sa.highlandPark.config.HpConfigContext;
-import com.att.sa.highlandPark.config.HpReaderException;
-import com.att.sa.highlandPark.events.HpJsonEvent;
-import com.att.sa.highlandPark.events.HpJsonEventFactory;
-import com.att.sa.highlandPark.processor.HpAlarmFilter;
-import com.att.sa.highlandPark.processor.HpEvent;
-import com.att.sa.highlandPark.processor.HpProcessingEngine;
-import com.att.sa.highlandPark.processor.HpProcessingEngine.EventFactory;
-*/
+
+
+
+
+
+
+
+
+
 /**
  * class used to write the consumed messages
  * 
@@ -173,7 +173,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
                // public Builder atOffset ( int pos )
                
        
-               // return this;
+               
                // }
                /**
                 * method returs object of CambriaOutboundEventStream
@@ -202,26 +202,26 @@ public class CambriaOutboundEventStream implements StreamWriter {
                fPretty = builder.fPretty;
                fWithMeta = builder.fWithMeta;
                fKafkaConsumerList = builder.fKafkaConsumerList;
-       /*      if (CambriaConstants.kNoFilter.equals(builder.fTopicFilter)) {
-                       fHpAlarmFilter = null;
-                       fHppe = null;
-               } else {
-                       try {
-                               final JSONObject filter = new JSONObject(new JSONTokener(builder.fTopicFilter));
-                               HpConfigContext<HpEvent> cc = new HpConfigContext<HpEvent>();
-                               fHpAlarmFilter = cc.create(HpAlarmFilter.class, filter);
-                               final EventFactory<HpJsonEvent> ef = new HpJsonEventFactory();
-                               fHppe = new HpProcessingEngine<HpJsonEvent>(ef);
-                       } catch (HpReaderException e) {
-                               // JSON was okay, but the filter engine says it's bogus
-                               throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
-                                               "Couldn't create filter: " + e.getMessage());
-                       } catch (JSONException e) {
-                               // user sent a bogus JSON object
-                               throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
-                                               "Couldn't parse JSON: " + e.getMessage());
-                       }
-               }*/
+       
+                       
+                       
+               
+                       
+                               
+                               
+                               
+                               
+                               
+                       
+                               
+                       
+                                               
+                       
+                               
+                               
+                                       
+               
+       
        }
 
        /**
@@ -265,7 +265,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
        public void write(final OutputStream os) throws IOException {
                
        
-               // final boolean transactionEnabled = istransEnable;
+               
                // synchronized(this){
                os.write('[');
                fSent = forEachMessage(new operation() {
@@ -539,7 +539,7 @@ public class CambriaOutboundEventStream implements StreamWriter {
        private final boolean fWithMeta;
        private int fSent;
 
-       //private final HpProcessingEngine<HpJsonEvent> fHppe;
+       
        private DMaaPContext dmaapContext;
        private String responseTransactionId;
        private Topic topic;
index 3462567..800bb0f 100644 (file)
@@ -188,7 +188,7 @@ public class ConfigurationReader {
                                log.info("Broker Type is:" + CambriaConstants.kBrokerType_Memory);
                                fPublisher = new MemoryQueuePublisher(q, mmb);
                                //Ramkumar remove below
-                       //      fMetaBroker = mmb;
+                       
                                fConsumerFactory = new MemoryConsumerFactory(q);
                        } else {
                                throw new IllegalArgumentException(
@@ -316,18 +316,18 @@ public class ConfigurationReader {
         * 
         * @return
         */
-       /*public rrNvReadable getSettings() {
-               return settings;
-       }*/
+       
+               
+       
 
        /**
         * method to set rrNvReadable object
         * 
         * @param settings
         */
-       /*public void setSettings(rrNvReadable settings) {
-               this.settings = settings;
-       }*/
+       
+       
+       
 
        /**
         * method to get CuratorFramework object
@@ -451,7 +451,7 @@ public class ConfigurationReader {
         * @return
         */
        public static String getMainZookeeperConnectionString() {
-               //return settings.getString(CambriaConstants.kSetting_ZkConfigDbServers,                        CambriaConstants.kDefault_ZkConfigDbServers);
+               
                
                 String zkServername = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
                 if (zkServername==null) zkServername=CambriaConstants.kDefault_ZkConfigDbServers;