public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+ private static final String PROPS_PROTOCOL = "Protocol";
+ private static final String PROPS_PARTITION = "partition";
+ private static final String PROPS_CONTENT_TYPE = "contenttype";
+
+ private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
+ private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
+ private static final String CONTENT_TYPE_JSON = "application/json";
+ private static final String CONTENT_TYPE_TEXT = "text/plain";
+
+ private static final String JSON_STATUS = "status";
+
public static class Builder {
- public Builder() {
- }
public Builder againstUrls(Collection<String> baseUrls) {
fUrls = baseUrls;
private String fTransportype;
private String fTopic;
private int fMaxBatchSize = 100;
+
private long fMaxBatchAgeMs = 1000;
private boolean fCompress = false;
private int threadOccuranceTime = 50;
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (remains.isEmpty()) {
- getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
- + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
+ getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
+ remains.size());
}
} catch (InterruptedException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.isEmpty()) {
+ if (fPending.size()>0) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
return jsonArray;
}
+ private void logTime(long startMs, String dmeResponse) {
+ if (getLog().isInfoEnabled()) {
+ getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
+ }
+ }
+
private synchronized boolean sendBatch() {
// it's possible for this call to be made with an empty list. in this
// case, just return.
host = this.fHostSelector.selectBaseHost();
}
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
os.close();
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
os.close();
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
DME2Configue();
- Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ this.wait(5);
+ getLog().info(String
+ .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
+ nowMs - fPending.peek().timestamp));
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
- getLog().info(logLine);
+ logTime(startMs, dmeResponse);
fPending.clear();
return true;
}
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final JSONObject result =
postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
.setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
protocolFlag);
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
// Here we are checking for error response. If HTTP status
// code is not within the http success response code
// then we consider this as error and return false
- if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
return false;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result.toString());
fPending.clear();
return true;
}
host = this.fHostSelector.selectBaseHost();
- final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
- props.getProperty("partition"));
+ final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
+ props.getProperty(PROPS_PARTITION));
OutputStream os = null;
try {
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
os = baseStream;
- final String contentType = props.getProperty("contenttype");
- if (contentType.equalsIgnoreCase("application/json")) {
+ final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
JSONArray jsonArray = parseJSON();
os.write(jsonArray.toString().getBytes());
- } else if (contentType.equalsIgnoreCase("text/plain")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
for (TimestampedMessage m : fPending) {
os.write(m.fMsg.getBytes());
os.write('\n');
}
- } else if (contentType.equalsIgnoreCase("application/cambria")
- || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
- if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+ } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
+ || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
+ if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
os = new GZIPOutputStream(baseStream);
}
for (TimestampedMessage m : fPending) {
try {
DME2Configue();
- Thread.sleep(5);
- getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ this.wait(5);
+ getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
+ nowMs - fPending.peek().timestamp);
sender.setPayload(os.toString());
String dmeResponse = sender.sendAndWait(5000L);
}
if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
authDate, username, password, protocolFlag);
// Here we are checking for error response. If HTTP status
return pubResponse;
}
- final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
- getLog().info(logLine);
+ logTime(startMs, result);
fPending.clear();
return pubResponse;
}
if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
password, protocolFlag);
}
if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
- getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
- + (nowMs - fPending.peek().timestamp) + " ms");
+ getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
+ nowMs - fPending.peek().timestamp);
final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
// Here we are checking for error response. If HTTP status
fPending.clear();
return pubResponse;
}
- } catch (IllegalArgumentException x) {
+ } catch (IllegalArgumentException | HttpException x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
pubResponse.setResponseMessage(x.getMessage());
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
pubResponse.setResponseMessage(x.getMessage());
-
- } catch (HttpException x) {
- getLog().warn(x.getMessage(), x);
- pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
- pubResponse.setResponseMessage(x.getMessage());
-
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
mrPubResponse.setResponseMessage("Please verify the Producer properties");
} else if (reply.startsWith("{")) {
JSONObject jObject = new JSONObject(reply);
- if (jObject.has("message") && jObject.has("status")) {
+ if (jObject.has("message") && jObject.has(JSON_STATUS)) {
String message = jObject.getString("message");
if (null != message) {
mrPubResponse.setResponseMessage(message);
}
- mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
} else {
mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
mrPubResponse.setResponseMessage(reply);
public void setProps(Properties props) {
this.props = props;
+ setClientConfig(DmaapClientUtil.getClientConfig(props));
}
public String getProtocolFlag() {
routeOffer = props.getProperty("routeOffer");
subContextPath = props.getProperty("SubContextPath") + fTopic;
- protocol = props.getProperty("Protocol");
+ protocol = props.getProperty(PROPS_PROTOCOL);
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
dmepassword = props.getProperty("password");
- contentType = props.getProperty("contenttype");
+ contentType = props.getProperty(PROPS_CONTENT_TYPE);
handlers = props.getProperty("sessionstickinessrequired");
routerFilePath = props.getProperty("DME2preferredRouterFilePath");
* routeOffer value for auto failover within a cluster
*/
- String partitionKey = props.getProperty("partition");
+ String partitionKey = props.getProperty(PROPS_PARTITION);
if (partner != null && !partner.isEmpty()) {
url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="