fUrls = baseUrls;
return this;
}
+
+ public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
+ {
+ fUrls = baseUrls;
+ fServiceName = serviceName;
+ fTransportype = transportype;
+ return this;
+ }
public Builder onTopic(String topic) {
fTopic = topic;
return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
fAllowSelfSignedCerts, threadOccuranceTime);
} catch (MalformedURLException e) {
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
} else {
try {
return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
fAllowSelfSignedCerts, fMaxBatchSize);
} catch (MalformedURLException e) {
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
}
private Collection<String> fUrls;
+ private Collection<String> fServiceName;
+ private String fTransportype;
private String fTopic;
private int fMaxBatchSize = 100;
private long fMaxBatchAgeMs = 1000;
public void close() {
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.size() > 0) {
+ if (remains.isEmpty()) {
getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
} catch (InterruptedException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
} catch (IOException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
}
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.size() > 0) {
+ if (fPending.isEmpty()) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
final long nowMs = Clock.now();
- host = this.fHostSelector.selectBaseHost();
+ if (this.fHostSelector != null) {
+ host = this.fHostSelector.selectBaseHost();
+ }
final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
props.getProperty("partition"));
try {
- /*
- * final String contentType = fCompress ?
- * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
- */
-
+
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
final String contentType = props.getProperty("contenttype");
fPending.clear();
return true;
}
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
} catch (IllegalArgumentException x) {
getLog().warn(x.getMessage(), x);
} catch (IOException x) {
fPending.clear();
return pubResponse;
}
+
+ if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+ + (nowMs - fPending.peek().timestamp) + " ms");
+ final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
+
+ // Here we are checking for error response. If HTTP status
+ // code is not within the http success response code
+ // then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result, pubResponse);
+
+ if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+ || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
} catch (IllegalArgumentException x) {
getLog().warn(x.getMessage(), x);
pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
return pubResponse;
}
- private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+ public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
if (reply.isEmpty()) {
private void DME2Configue() throws Exception {
try {
- /*
- * FileReader reader = new FileReader(new File (producerFilePath));
- * Properties props = new Properties(); props.load(reader);
- */
latitude = props.getProperty("Latitude");
longitude = props.getProperty("Longitude");
version = props.getProperty("Version");
partner = props.getProperty("Partner");
routeOffer = props.getProperty("routeOffer");
subContextPath = props.getProperty("SubContextPath") + fTopic;
- /*
- * if(props.getProperty("partition")!=null &&
- * !props.getProperty("partition").equalsIgnoreCase("")){
- * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
- * "partition"); }
- */
+
protocol = props.getProperty("Protocol");
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
// SSL changes
// System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- // "SSLv3,TLSv1,TLSv1.1");
+
System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
sender.setSubContext(subContextPath);
sender.setCredentials(dmeuser, dmepassword);
sender.setHeaders(DMETimeOuts);
- if (handlers.equalsIgnoreCase("yes")) {
+ if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
} catch (Exception x) {
getLog().warn(x.getMessage(), x);
- throw new Exception(x.getMessage());
+ throw new IllegalArgumentException(x.getMessage());
}
}