private String fTransportype;
private String fTopic;
private int fMaxBatchSize = 100;
+
private long fMaxBatchAgeMs = 1000;
private boolean fCompress = false;
private int threadOccuranceTime = 50;
@Override
public int send(message msg) {
- final LinkedList<message> list = new LinkedList<message>();
+ final LinkedList<message> list = new LinkedList<>();
list.add(msg);
return send(list);
}
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (remains.isEmpty()) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+ getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
+ remains.size());
}
} catch (InterruptedException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
}
synchronized (this) {
- final LinkedList<message> result = new LinkedList<message>();
+ final LinkedList<message> result = new LinkedList<>();
fPending.drainTo(result);
return result;
}
* @param force
*/
private synchronized void send(boolean force) {
- if (force || shouldSendNow()) {
- if (!sendBatch()) {
+ if ((force || shouldSendNow()) && !sendBatch()) {
getLog().warn("Send failed, " + fPending.size() + " message to send.");
// note the time for back-off
fDontSendUntilMs = sfWaitAfterError + Clock.now();
- }
}
}
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.isEmpty()) {
+ if (fPending.size()>0) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
- if (fPending.size() < 1) {
+ if (fPending.isEmpty()) {
return true;
}
DME2Configue();
Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info(String
+ .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
+ nowMs - fPending.peek().timestamp));
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
getLog().info(logLine);
fPending.clear();
return true;
}
- if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
- final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
- username, password, protocolFlag);
- // Here we are checking for error response. If HTTP status
- // code is not within the http success response code
- // then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
- return false;
- }
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
- fPending.clear();
- return true;
- }
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
+ final JSONObject result =
+ postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
+ .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
+ .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
protocolFlag);
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
// Here we are checking for error response. If HTTP status
fPending.clear();
return true;
}
- } catch (IllegalArgumentException x) {
- getLog().warn(x.getMessage(), x);
- } catch (IOException x) {
- getLog().warn(x.getMessage(), x);
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
}
DME2Configue();
Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
+ nowMs - fPending.peek().timestamp);
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
}
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
authDate, username, password, protocolFlag);
// Here we are checking for error response. If HTTP status
}
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
password, protocolFlag);
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
// Here we are checking for error response. If HTTP status
fPending.clear();
return pubResponse;
}
- } catch (IllegalArgumentException x) {
+ } catch (IllegalArgumentException | HttpException x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
-
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
}
finally {
- if (fPending.size() > 0) {
+ if (!fPending.isEmpty()) {
getLog().warn("Send failed, " + fPending.size() + " message to send.");
pubResponse.setPendingMsgs(fPending.size());
}
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fPending = new LinkedBlockingQueue<>();
fDontSendUntilMs = 0;
fExec = new ScheduledThreadPoolExecutor(1);
pubResponse = new MRPublisherResponse();
fMaxBatchAgeMs = maxBatchAgeMs;
fCompress = compress;
threadOccuranceTime = httpThreadOccurnace;
- fPending = new LinkedBlockingQueue<TimestampedMessage>();
+ fPending = new LinkedBlockingQueue<>();
fDontSendUntilMs = 0;
fExec = new ScheduledThreadPoolExecutor(1);
fExec.scheduleAtFixedRate(new Runnable() {