Fix sonar issues in dmaap-messagerouter-msgrtr
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
index b66a251..b3f3c8f 100644 (file)
@@ -119,6 +119,7 @@ public class Kafka011Consumer implements Consumer {
                } catch (InterruptedException x) {
                        log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
                                        x);
+                       Thread.currentThread().interrupt();
                }
 
                Callable<Boolean> run = new Callable<Boolean>() {
@@ -135,11 +136,10 @@ public class Kafka011Consumer implements Consumer {
                                        }
 
                                } catch (KafkaException x) {
-                                       log.debug(fLogTag + ": KafkaException " + x.getMessage());
+                                       log.debug(fLogTag + ": KafkaException ", x);
 
                                } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
-                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
-                                                       + x.getMessage());
+                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
 
                                }
 
@@ -156,25 +156,26 @@ public class Kafka011Consumer implements Consumer {
                        future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
                        // second
                } catch (TimeoutException ex) {
+               log.error("TimeoutException in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        String apiNodeId = null;
                        try {
                                apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
                        } catch (UnknownHostException e1) {
-                               // TODO Auto-generated catch block
-                               log.error("unable to get the localhost address");
+                               log.error("unable to get the localhost address ", e1);
                        }
 
                        try {
                                if (fKafkaLiveLockAvoider != null)
                                        fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
                        } catch (Exception e) {
-                               log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+                               log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
                        }
 
                        forcePollOnConsumer();
                        future.cancel(true);
                } catch (Exception ex) {
+            log.error("Exception in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        future.cancel(true);
                }
@@ -307,13 +308,12 @@ public class Kafka011Consumer implements Consumer {
                        // second
                } catch (TimeoutException ex) {
                        // timed out. Try to stop the code if possible.
-                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                } catch (Exception ex) {
                        // timed out. Try to stop the code if possible.
-                       log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
-                                       + ex);
+                       log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                        return false;