fUrls = baseUrls;
return this;
}
+
+ public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
+ {
+ fUrls = baseUrls;
+ fServiceName = serviceName;
+ fTransportype = transportype;
+ return this;
+ }
public Builder onTopic(String topic) {
fTopic = topic;
}
private Collection<String> fUrls;
+ private Collection<String> fServiceName;
+ private String fTransportype;
private String fTopic;
private int fMaxBatchSize = 100;
private long fMaxBatchAgeMs = 1000;
public void close() {
try {
final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (remains.size() > 0) {
+ if (remains.isEmpty()) {
getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+ "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
}
} catch (InterruptedException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
+ Thread.currentThread().interrupt();
} catch (IOException e) {
getLog().warn("Possible message loss. " + e.getMessage(), e);
}
private synchronized boolean shouldSendNow() {
boolean shouldSend = false;
- if (fPending.size() > 0) {
+ if (fPending.isEmpty()) {
final long nowMs = Clock.now();
shouldSend = (fPending.size() >= fMaxBatchSize);
final long nowMs = Clock.now();
- host = this.fHostSelector.selectBaseHost();
+ if (this.fHostSelector != null) {
+ host = this.fHostSelector.selectBaseHost();
+ }
final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
props.getProperty("partition"));
try {
- /*
- * final String contentType = fCompress ?
- * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
- */
-
+
final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
OutputStream os = baseStream;
final String contentType = props.getProperty("contenttype");
private void DME2Configue() throws Exception {
try {
- /*
- * FileReader reader = new FileReader(new File (producerFilePath));
- * Properties props = new Properties(); props.load(reader);
- */
latitude = props.getProperty("Latitude");
longitude = props.getProperty("Longitude");
version = props.getProperty("Version");
partner = props.getProperty("Partner");
routeOffer = props.getProperty("routeOffer");
subContextPath = props.getProperty("SubContextPath") + fTopic;
- /*
- * if(props.getProperty("partition")!=null &&
- * !props.getProperty("partition").equalsIgnoreCase("")){
- * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
- * "partition"); }
- */
+
protocol = props.getProperty("Protocol");
methodType = props.getProperty("MethodType");
dmeuser = props.getProperty("username");
// SSL changes
// System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
- // "SSLv3,TLSv1,TLSv1.1");
+
System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
sender.setSubContext(subContextPath);
sender.setCredentials(dmeuser, dmepassword);
sender.setHeaders(DMETimeOuts);
- if (handlers.equalsIgnoreCase("yes")) {
+ if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",