fId = id;
fCreateTimeMs = System.currentTimeMillis();
fLastTouch = fCreateTimeMs;
- fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
+ fPendingMsgs = new LinkedBlockingQueue<>();
fLogTag = fGroup + "(" + fId + ")/" + fTopic;
offset = 0;
state = Kafka011Consumer.State.OPENED;
public synchronized Consumer.Message nextMessage() {
try {
- if (fPendingMsgs.size() > 0) {
+ if (fPendingMsgs.isEmpty()) {
return makeMessage(fPendingMsgs.take());
}
} catch (InterruptedException x) {