1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 public static class Builder {
66 public Builder againstUrls(Collection<String> baseUrls) {
71 public Builder onTopic(String topic) {
76 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
77 fMaxBatchSize = maxBatchSize;
78 fMaxBatchAgeMs = maxBatchAgeMs;
82 public Builder compress(boolean compress) {
87 public Builder httpThreadTime(int threadOccuranceTime) {
88 this.threadOccuranceTime = threadOccuranceTime;
92 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
93 fAllowSelfSignedCerts = allowSelfSignedCerts;
97 public Builder withResponse(boolean withResponse) {
98 fWithResponse = withResponse;
102 public MRSimplerBatchPublisher build() {
103 if (!fWithResponse) {
105 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
106 fAllowSelfSignedCerts, threadOccuranceTime);
107 } catch (MalformedURLException e) {
108 throw new IllegalArgumentException(e);
112 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
113 fAllowSelfSignedCerts, fMaxBatchSize);
114 } catch (MalformedURLException e) {
115 throw new IllegalArgumentException(e);
121 private Collection<String> fUrls;
122 private String fTopic;
123 private int fMaxBatchSize = 100;
124 private long fMaxBatchAgeMs = 1000;
125 private boolean fCompress = false;
126 private int threadOccuranceTime = 50;
127 private boolean fAllowSelfSignedCerts = false;
128 private boolean fWithResponse = false;
133 public int send(String partition, String msg) {
134 return send(new message(partition, msg));
138 public int send(String msg) {
139 return send(new message(null, msg));
143 public int send(message msg) {
144 final LinkedList<message> list = new LinkedList<message>();
150 public synchronized int send(Collection<message> msgs) {
152 throw new IllegalStateException("The publisher was closed.");
155 for (message userMsg : msgs) {
156 fPending.add(new TimestampedMessage(userMsg));
158 return getPendingMessageCount();
162 public synchronized int getPendingMessageCount() {
163 return fPending.size();
167 public void close() {
169 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
170 if (remains.size() > 0) {
171 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
172 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
174 } catch (InterruptedException e) {
175 getLog().warn("Possible message loss. " + e.getMessage(), e);
176 Thread.currentThread().interrupt();
177 } catch (IOException e) {
178 getLog().warn("Possible message loss. " + e.getMessage(), e);
183 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
184 synchronized (this) {
187 // stop the background sender
188 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
189 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
193 final long now = Clock.now();
194 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
195 final long timeoutAtMs = now + waitInMs;
197 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
202 synchronized (this) {
203 final LinkedList<message> result = new LinkedList<message>();
204 fPending.drainTo(result);
210 * Possibly send a batch to the MR server. This is called by the background
211 * thread and the close() method
215 private synchronized void send(boolean force) {
216 if (force || shouldSendNow()) {
218 getLog().warn("Send failed, " + fPending.size() + " message to send.");
220 // note the time for back-off
221 fDontSendUntilMs = sfWaitAfterError + Clock.now();
226 private synchronized boolean shouldSendNow() {
227 boolean shouldSend = false;
228 if (fPending.size() > 0) {
229 final long nowMs = Clock.now();
231 shouldSend = (fPending.size() >= fMaxBatchSize);
233 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
234 shouldSend = sendAtMs <= nowMs;
237 // however, wait after an error
238 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
244 * Method to parse published JSON Objects and Arrays
248 private JSONArray parseJSON() {
249 JSONArray jsonArray = new JSONArray();
250 for (TimestampedMessage m : fPending) {
251 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
252 JSONObject jsonObject = null;
253 JSONArray tempjsonArray = null;
254 final char firstChar = jsonTokener.next();
256 if ('[' == firstChar) {
257 tempjsonArray = new JSONArray(jsonTokener);
258 if (null != tempjsonArray) {
259 for (int i = 0; i < tempjsonArray.length(); i++) {
260 jsonArray.put(tempjsonArray.getJSONObject(i));
264 jsonObject = new JSONObject(jsonTokener);
265 jsonArray.put(jsonObject);
272 private synchronized boolean sendBatch() {
273 // it's possible for this call to be made with an empty list. in this
274 // case, just return.
275 if (fPending.size() < 1) {
279 final long nowMs = Clock.now();
281 host = this.fHostSelector.selectBaseHost();
283 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
284 props.getProperty("partition"));
288 * final String contentType = fCompress ?
289 * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
292 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
293 OutputStream os = baseStream;
294 final String contentType = props.getProperty("contenttype");
295 if (contentType.equalsIgnoreCase("application/json")) {
296 JSONArray jsonArray = parseJSON();
297 os.write(jsonArray.toString().getBytes());
300 } else if (contentType.equalsIgnoreCase("text/plain")) {
301 for (TimestampedMessage m : fPending) {
302 os.write(m.fMsg.getBytes());
306 } else if (contentType.equalsIgnoreCase("application/cambria")
307 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
308 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
309 os = new GZIPOutputStream(baseStream);
311 for (TimestampedMessage m : fPending) {
313 os.write(("" + m.fPartition.length()).getBytes());
315 os.write(("" + m.fMsg.length()).getBytes());
317 os.write(m.fPartition.getBytes());
318 os.write(m.fMsg.getBytes());
323 for (TimestampedMessage m : fPending) {
324 os.write(m.fMsg.getBytes());
330 final long startMs = Clock.now();
331 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
336 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
337 + (nowMs - fPending.peek().timestamp) + " ms");
338 sender.setPayload(os.toString());
339 String dmeResponse = sender.sendAndWait(5000L);
341 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
342 getLog().info(logLine);
347 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
348 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
349 + (nowMs - fPending.peek().timestamp) + " ms");
350 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
351 username, password, protocolFlag);
352 // Here we are checking for error response. If HTTP status
353 // code is not within the http success response code
354 // then we consider this as error and return false
355 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
358 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
359 getLog().info(logLine);
364 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
365 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
366 + (nowMs - fPending.peek().timestamp) + " ms");
367 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
370 // Here we are checking for error response. If HTTP status
371 // code is not within the http success response code
372 // then we consider this as error and return false
373 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
376 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
377 getLog().info(logLine);
382 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
383 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
384 + (nowMs - fPending.peek().timestamp) + " ms");
385 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
387 // Here we are checking for error response. If HTTP status
388 // code is not within the http success response code
389 // then we consider this as error and return false
390 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
393 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
394 getLog().info(logLine);
398 } catch (IllegalArgumentException x) {
399 getLog().warn(x.getMessage(), x);
400 } catch (IOException x) {
401 getLog().warn(x.getMessage(), x);
402 } catch (HttpException x) {
403 getLog().warn(x.getMessage(), x);
404 } catch (Exception x) {
405 getLog().warn(x.getMessage(), x);
410 public synchronized MRPublisherResponse sendBatchWithResponse() {
411 // it's possible for this call to be made with an empty list. in this
412 // case, just return.
413 if (fPending.size() < 1) {
414 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
415 pubResponse.setResponseMessage("No Messages to send");
419 final long nowMs = Clock.now();
421 host = this.fHostSelector.selectBaseHost();
423 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
424 props.getProperty("partition"));
425 OutputStream os = null;
428 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
430 final String contentType = props.getProperty("contenttype");
431 if (contentType.equalsIgnoreCase("application/json")) {
432 JSONArray jsonArray = parseJSON();
433 os.write(jsonArray.toString().getBytes());
434 } else if (contentType.equalsIgnoreCase("text/plain")) {
435 for (TimestampedMessage m : fPending) {
436 os.write(m.fMsg.getBytes());
439 } else if (contentType.equalsIgnoreCase("application/cambria")
440 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
441 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
442 os = new GZIPOutputStream(baseStream);
444 for (TimestampedMessage m : fPending) {
446 os.write(("" + m.fPartition.length()).getBytes());
448 os.write(("" + m.fMsg.length()).getBytes());
450 os.write(m.fPartition.getBytes());
451 os.write(m.fMsg.getBytes());
456 for (TimestampedMessage m : fPending) {
457 os.write(m.fMsg.getBytes());
462 final long startMs = Clock.now();
463 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
469 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
470 + (nowMs - fPending.peek().timestamp) + " ms");
471 sender.setPayload(os.toString());
473 String dmeResponse = sender.sendAndWait(5000L);
475 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
477 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
478 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
482 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
483 getLog().info(logLine);
486 } catch (DME2Exception x) {
487 getLog().warn(x.getMessage(), x);
488 pubResponse.setResponseCode(x.getErrorCode());
489 pubResponse.setResponseMessage(x.getErrorMessage());
490 } catch (URISyntaxException x) {
492 getLog().warn(x.getMessage(), x);
493 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
494 pubResponse.setResponseMessage(x.getMessage());
495 } catch (Exception x) {
497 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
498 pubResponse.setResponseMessage(x.getMessage());
499 logger.error("exception: ", x);
506 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
507 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
508 + (nowMs - fPending.peek().timestamp) + " ms");
509 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
510 authDate, username, password, protocolFlag);
511 // Here we are checking for error response. If HTTP status
512 // code is not within the http success response code
513 // then we consider this as error and return false
515 pubResponse = createMRPublisherResponse(result, pubResponse);
517 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
518 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
523 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
524 getLog().info(logLine);
529 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
530 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
531 + (nowMs - fPending.peek().timestamp) + " ms");
532 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
533 password, protocolFlag);
535 // Here we are checking for error response. If HTTP status
536 // code is not within the http success response code
537 // then we consider this as error and return false
538 pubResponse = createMRPublisherResponse(result, pubResponse);
540 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
541 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
546 final String logLine = String.valueOf((Clock.now() - startMs));
547 getLog().info(logLine);
552 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
553 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
554 + (nowMs - fPending.peek().timestamp) + " ms");
555 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
557 // Here we are checking for error response. If HTTP status
558 // code is not within the http success response code
559 // then we consider this as error and return false
560 pubResponse = createMRPublisherResponse(result, pubResponse);
562 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
563 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
568 final String logLine = String.valueOf((Clock.now() - startMs));
569 getLog().info(logLine);
573 } catch (IllegalArgumentException x) {
574 getLog().warn(x.getMessage(), x);
575 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
576 pubResponse.setResponseMessage(x.getMessage());
578 } catch (IOException x) {
579 getLog().warn(x.getMessage(), x);
580 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
581 pubResponse.setResponseMessage(x.getMessage());
583 } catch (HttpException x) {
584 getLog().warn(x.getMessage(), x);
585 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
586 pubResponse.setResponseMessage(x.getMessage());
588 } catch (Exception x) {
589 getLog().warn(x.getMessage(), x);
591 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
592 pubResponse.setResponseMessage(x.getMessage());
597 if (fPending.size() > 0) {
598 getLog().warn("Send failed, " + fPending.size() + " message to send.");
599 pubResponse.setPendingMsgs(fPending.size());
604 } catch (Exception x) {
605 getLog().warn(x.getMessage(), x);
606 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
607 pubResponse.setResponseMessage("Error in closing Output Stream");
615 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
617 if (reply.isEmpty()) {
619 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
620 mrPubResponse.setResponseMessage("Please verify the Producer properties");
621 } else if (reply.startsWith("{")) {
622 JSONObject jObject = new JSONObject(reply);
623 if (jObject.has("message") && jObject.has("status")) {
624 String message = jObject.getString("message");
625 if (null != message) {
626 mrPubResponse.setResponseMessage(message);
628 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
630 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
631 mrPubResponse.setResponseMessage(reply);
633 } else if (reply.startsWith("<")) {
634 String responseCode = getHTTPErrorResponseCode(reply);
635 if (responseCode.contains("403")) {
636 responseCode = "403";
638 mrPubResponse.setResponseCode(responseCode);
639 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
642 return mrPubResponse;
645 private final String fTopic;
646 private final int fMaxBatchSize;
647 private final long fMaxBatchAgeMs;
648 private final boolean fCompress;
649 private int threadOccuranceTime;
650 private boolean fClosed;
651 private String username;
652 private String password;
656 private HostSelector fHostSelector = null;
658 private final LinkedBlockingQueue<TimestampedMessage> fPending;
659 private long fDontSendUntilMs;
660 private final ScheduledThreadPoolExecutor fExec;
662 private String latitude;
663 private String longitude;
664 private String version;
665 private String serviceName;
667 private String partner;
668 private String routeOffer;
669 private String subContextPath;
670 private String protocol;
671 private String methodType;
673 private String dmeuser;
674 private String dmepassword;
675 private String contentType;
676 private static final long sfWaitAfterError = 10000;
677 private HashMap<String, String> DMETimeOuts;
678 private DME2Client sender;
679 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
680 private String authKey;
681 private String authDate;
682 private String handlers;
683 private Properties props;
684 public static String routerFilePath;
685 protected static final Map<String, String> headers = new HashMap<String, String>();
686 public static MultivaluedMap<String, Object> headersMap;
688 private MRPublisherResponse pubResponse;
690 public MRPublisherResponse getPubResponse() {
694 public void setPubResponse(MRPublisherResponse pubResponse) {
695 this.pubResponse = pubResponse;
698 public static String getRouterFilePath() {
699 return routerFilePath;
702 public static void setRouterFilePath(String routerFilePath) {
703 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
706 public Properties getProps() {
710 public void setProps(Properties props) {
714 public String getProtocolFlag() {
718 public void setProtocolFlag(String protocolFlag) {
719 this.protocolFlag = protocolFlag;
722 private void DME2Configue() throws Exception {
726 * FileReader reader = new FileReader(new File (producerFilePath));
727 * Properties props = new Properties(); props.load(reader);
729 latitude = props.getProperty("Latitude");
730 longitude = props.getProperty("Longitude");
731 version = props.getProperty("Version");
732 serviceName = props.getProperty("ServiceName");
733 env = props.getProperty("Environment");
734 partner = props.getProperty("Partner");
735 routeOffer = props.getProperty("routeOffer");
736 subContextPath = props.getProperty("SubContextPath") + fTopic;
738 * if(props.getProperty("partition")!=null &&
739 * !props.getProperty("partition").equalsIgnoreCase("")){
740 * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
743 protocol = props.getProperty("Protocol");
744 methodType = props.getProperty("MethodType");
745 dmeuser = props.getProperty("username");
746 dmepassword = props.getProperty("password");
747 contentType = props.getProperty("contenttype");
748 handlers = props.getProperty("sessionstickinessrequired");
749 routerFilePath = props.getProperty("DME2preferredRouterFilePath");
752 * Changes to DME2Client url to use Partner for auto failover
753 * between data centers When Partner value is not provided use the
754 * routeOffer value for auto failover within a cluster
757 String partitionKey = props.getProperty("partition");
759 if (partner != null && !partner.isEmpty()) {
760 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
762 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
763 url = url + "&partitionKey=" + partitionKey;
765 } else if (routeOffer != null && !routeOffer.isEmpty()) {
766 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
768 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
769 url = url + "&partitionKey=" + partitionKey;
773 DMETimeOuts = new HashMap<String, String>();
774 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
775 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
776 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
777 DMETimeOuts.put("Content-Type", contentType);
778 System.setProperty("AFT_LATITUDE", latitude);
779 System.setProperty("AFT_LONGITUDE", longitude);
780 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
781 // System.setProperty("DME2.DEBUG", "true");
784 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
785 // "SSLv3,TLSv1,TLSv1.1");
786 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
787 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
788 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
792 sender = new DME2Client(new URI(url), 5000L);
794 sender.setAllowAllHttpReturnCodes(true);
795 sender.setMethod(methodType);
796 sender.setSubContext(subContextPath);
797 sender.setCredentials(dmeuser, dmepassword);
798 sender.setHeaders(DMETimeOuts);
799 if (handlers.equalsIgnoreCase("yes")) {
800 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
801 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
802 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
803 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
804 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
806 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
808 } catch (DME2Exception x) {
809 getLog().warn(x.getMessage(), x);
810 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
811 } catch (URISyntaxException x) {
813 getLog().warn(x.getMessage(), x);
814 throw new URISyntaxException(url, x.getMessage());
815 } catch (Exception x) {
817 getLog().warn(x.getMessage(), x);
818 throw new IllegalArgumentException(x.getMessage());
822 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
823 boolean compress) throws MalformedURLException {
826 if (topic == null || topic.length() < 1) {
827 throw new IllegalArgumentException("A topic must be provided.");
830 fHostSelector = new HostSelector(hosts, null);
833 fMaxBatchSize = maxBatchSize;
834 fMaxBatchAgeMs = maxBatchAgeMs;
835 fCompress = compress;
837 fPending = new LinkedBlockingQueue<TimestampedMessage>();
838 fDontSendUntilMs = 0;
839 fExec = new ScheduledThreadPoolExecutor(1);
840 pubResponse = new MRPublisherResponse();
844 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
845 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
848 if (topic == null || topic.length() < 1) {
849 throw new IllegalArgumentException("A topic must be provided.");
852 fHostSelector = new HostSelector(hosts, null);
855 fMaxBatchSize = maxBatchSize;
856 fMaxBatchAgeMs = maxBatchAgeMs;
857 fCompress = compress;
858 threadOccuranceTime = httpThreadOccurnace;
859 fPending = new LinkedBlockingQueue<TimestampedMessage>();
860 fDontSendUntilMs = 0;
861 fExec = new ScheduledThreadPoolExecutor(1);
862 fExec.scheduleAtFixedRate(new Runnable() {
867 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
870 private static class TimestampedMessage extends message {
871 public TimestampedMessage(message m) {
873 timestamp = Clock.now();
876 public final long timestamp;
879 public String getUsername() {
883 public void setUsername(String username) {
884 this.username = username;
887 public String getPassword() {
891 public void setPassword(String password) {
892 this.password = password;
895 public String getHost() {
899 public void setHost(String host) {
903 public String getContentType() {
907 public void setContentType(String contentType) {
908 this.contentType = contentType;
911 public String getAuthKey() {
915 public void setAuthKey(String authKey) {
916 this.authKey = authKey;
919 public String getAuthDate() {
923 public void setAuthDate(String authDate) {
924 this.authDate = authDate;