X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fbackends%2Fkafka%2FKafka011Consumer.java;h=67a8af930d5c0fb30a45ffd70f90f4dec234ee11;hb=ab3015141a2cb42308f1d09dc3a7edc8c10cf5c7;hp=4e54a3c659755a3bb1e82157a203981ec4e05817;hpb=7823ae72abf905f77842d533374b7f2ebb7c0b34;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java index 4e54a3c..67a8af9 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -113,13 +113,13 @@ public class Kafka011Consumer implements Consumer { public synchronized Consumer.Message nextMessage() { try { - if (fPendingMsgs.isEmpty()) { + if (!fPendingMsgs.isEmpty()) { return makeMessage(fPendingMsgs.take()); } } catch (InterruptedException x) { log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")", x); - //Thread.currentThread().interrupt(); + Thread.currentThread().interrupt(); } Callable run = new Callable() {