Merge "revert few sonar fixes"
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / Kafka011Consumer.java
index 347f625..4e54a3c 100644 (file)
@@ -34,6 +34,7 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -42,8 +43,7 @@ import org.apache.kafka.common.KafkaException;
 import org.onap.dmaap.dmf.mr.backends.Consumer;
 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
 
-
-
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
 
@@ -77,12 +77,18 @@ public class Kafka011Consumer implements Consumer {
                fId = id;
                fCreateTimeMs = System.currentTimeMillis();
                fLastTouch = fCreateTimeMs;
-               fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+               fPendingMsgs = new LinkedBlockingQueue<>();
                fLogTag = fGroup + "(" + fId + ")/" + fTopic;
                offset = 0;
                state = Kafka011Consumer.State.OPENED;
                kConsumer = cc;
                fKafkaLiveLockAvoider = klla;
+               
+               String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                               "consumer.timeout");
+               if (StringUtils.isNotEmpty(consumerTimeOut)) {
+                       consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
+               }
                synchronized (kConsumer) {
                        kConsumer.subscribe(Arrays.asList(topic));
                }
@@ -107,12 +113,13 @@ public class Kafka011Consumer implements Consumer {
        public synchronized Consumer.Message nextMessage() {
 
                try {
-                       if (fPendingMsgs.size() > 0) {
+                       if (fPendingMsgs.isEmpty()) {
                                return makeMessage(fPendingMsgs.take());
                        }
                } catch (InterruptedException x) {
                        log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")",
                                        x);
+                       //Thread.currentThread().interrupt();
                }
 
                Callable<Boolean> run = new Callable<Boolean>() {
@@ -129,11 +136,10 @@ public class Kafka011Consumer implements Consumer {
                                        }
 
                                } catch (KafkaException x) {
-                                       log.debug(fLogTag + ": KafkaException " + x.getMessage());
+                                       log.debug(fLogTag + ": KafkaException ", x);
 
                                } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) {
-                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. "
-                                                       + x.getMessage());
+                                       log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x);
 
                                }
 
@@ -147,28 +153,29 @@ public class Kafka011Consumer implements Consumer {
                ExecutorService service = Executors.newSingleThreadExecutor();
                service.execute(future);
                try {
-                       future.get(5, TimeUnit.SECONDS); // wait 1
+                       future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1
                        // second
                } catch (TimeoutException ex) {
+               log.error("TimeoutException in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        String apiNodeId = null;
                        try {
                                apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
                        } catch (UnknownHostException e1) {
-                               // TODO Auto-generated catch block
-                               log.error("unable to get the localhost address");
+                               log.error("unable to get the localhost address ", e1);
                        }
 
                        try {
                                if (fKafkaLiveLockAvoider != null)
                                        fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup);
                        } catch (Exception e) {
-                               log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup);
+                               log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e);
                        }
 
                        forcePollOnConsumer();
                        future.cancel(true);
                } catch (Exception ex) {
+            log.error("Exception in in Kafka consumer ", ex);
                        // timed out. Try to stop the code if possible.
                        future.cancel(true);
                }
@@ -301,13 +308,12 @@ public class Kafka011Consumer implements Consumer {
                        // second
                } catch (TimeoutException ex) {
                        // timed out. Try to stop the code if possible.
-                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task");
+                       log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                } catch (Exception ex) {
                        // timed out. Try to stop the code if possible.
-                       log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task"
-                                       + ex);
+                       log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex);
                        future.cancel(true);
                        setState(Kafka011Consumer.State.OPENED);
                        return false;
@@ -370,6 +376,7 @@ public class Kafka011Consumer implements Consumer {
        private long offset;
        private Kafka011Consumer.State state;
        private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider;
+       private int consumerPollTimeOut=5;
        private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class);
        private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs;