@Override
public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
+ final LinkedList<message> list = new LinkedList<>();
list.add(msg);
return send(list);
}
}
synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
+ final LinkedList<message> result = new LinkedList<>();
fPending.drainTo(result);
return result;
}
* @param force
*/
private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
+ if ((force || shouldSendNow()) && !sendBatch()) {
getLog().warn("Send failed, " + fPending.size() + " message to send.");
// note the time for back-off
fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
}
}
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
- if (fPending.size() < 1) {
+ if (fPending.isEmpty()) {
return true;
}
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
getLog().info(logLine);
fPending.clear();
return true;
fPending.clear();
return true;
}
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- } catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
}
}
finally {
- if (fPending.size() > 0) {
+ if (!fPending.isEmpty()) {
getLog().warn("Send failed, " + fPending.size() + " message to send.");
pubResponse.setPendingMsgs(fPending.size());
}
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fPending = new LinkedBlockingQueue<>();
fDontSendUntilMs = 0;
fExec = new ScheduledThreadPoolExecutor(1);
pubResponse = new MRPublisherResponse();
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
threadOccuranceTime = httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fPending = new LinkedBlockingQueue<>();
fDontSendUntilMs = 0;
fExec = new ScheduledThreadPoolExecutor(1);
fExec.scheduleAtFixedRate(new Runnable() {