1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 public static class Builder {
66 public Builder againstUrls(Collection<String> baseUrls) {
71 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
74 fServiceName = serviceName;
75 fTransportype = transportype;
79 public Builder onTopic(String topic) {
84 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
85 fMaxBatchSize = maxBatchSize;
86 fMaxBatchAgeMs = maxBatchAgeMs;
90 public Builder compress(boolean compress) {
95 public Builder httpThreadTime(int threadOccuranceTime) {
96 this.threadOccuranceTime = threadOccuranceTime;
100 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
101 fAllowSelfSignedCerts = allowSelfSignedCerts;
105 public Builder withResponse(boolean withResponse) {
106 fWithResponse = withResponse;
110 public MRSimplerBatchPublisher build() {
111 if (!fWithResponse) {
113 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
114 fAllowSelfSignedCerts, threadOccuranceTime);
115 } catch (MalformedURLException e) {
116 throw new IllegalArgumentException(e);
120 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
121 fAllowSelfSignedCerts, fMaxBatchSize);
122 } catch (MalformedURLException e) {
123 throw new IllegalArgumentException(e);
129 private Collection<String> fUrls;
130 private Collection<String> fServiceName;
131 private String fTransportype;
132 private String fTopic;
133 private int fMaxBatchSize = 100;
134 private long fMaxBatchAgeMs = 1000;
135 private boolean fCompress = false;
136 private int threadOccuranceTime = 50;
137 private boolean fAllowSelfSignedCerts = false;
138 private boolean fWithResponse = false;
143 public int send(String partition, String msg) {
144 return send(new message(partition, msg));
148 public int send(String msg) {
149 return send(new message(null, msg));
153 public int send(message msg) {
154 final LinkedList<message> list = new LinkedList<message>();
160 public synchronized int send(Collection<message> msgs) {
162 throw new IllegalStateException("The publisher was closed.");
165 for (message userMsg : msgs) {
166 fPending.add(new TimestampedMessage(userMsg));
168 return getPendingMessageCount();
172 public synchronized int getPendingMessageCount() {
173 return fPending.size();
177 public void close() {
179 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
180 if (remains.isEmpty()) {
181 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
182 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
184 } catch (InterruptedException e) {
185 getLog().warn("Possible message loss. " + e.getMessage(), e);
186 Thread.currentThread().interrupt();
187 } catch (IOException e) {
188 getLog().warn("Possible message loss. " + e.getMessage(), e);
193 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
194 synchronized (this) {
197 // stop the background sender
198 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
199 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
203 final long now = Clock.now();
204 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
205 final long timeoutAtMs = now + waitInMs;
207 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
212 synchronized (this) {
213 final LinkedList<message> result = new LinkedList<message>();
214 fPending.drainTo(result);
220 * Possibly send a batch to the MR server. This is called by the background
221 * thread and the close() method
225 private synchronized void send(boolean force) {
226 if (force || shouldSendNow()) {
228 getLog().warn("Send failed, " + fPending.size() + " message to send.");
230 // note the time for back-off
231 fDontSendUntilMs = sfWaitAfterError + Clock.now();
236 private synchronized boolean shouldSendNow() {
237 boolean shouldSend = false;
238 if (fPending.size() > 0) {
239 final long nowMs = Clock.now();
241 shouldSend = (fPending.size() >= fMaxBatchSize);
243 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
244 shouldSend = sendAtMs <= nowMs;
247 // however, wait after an error
248 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
254 * Method to parse published JSON Objects and Arrays
258 private JSONArray parseJSON() {
259 JSONArray jsonArray = new JSONArray();
260 for (TimestampedMessage m : fPending) {
261 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
262 JSONObject jsonObject = null;
263 JSONArray tempjsonArray = null;
264 final char firstChar = jsonTokener.next();
266 if ('[' == firstChar) {
267 tempjsonArray = new JSONArray(jsonTokener);
268 if (null != tempjsonArray) {
269 for (int i = 0; i < tempjsonArray.length(); i++) {
270 jsonArray.put(tempjsonArray.getJSONObject(i));
274 jsonObject = new JSONObject(jsonTokener);
275 jsonArray.put(jsonObject);
282 private synchronized boolean sendBatch() {
283 // it's possible for this call to be made with an empty list. in this
284 // case, just return.
285 if (fPending.size() < 1) {
289 final long nowMs = Clock.now();
291 if (this.fHostSelector != null) {
292 host = this.fHostSelector.selectBaseHost();
295 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
296 props.getProperty("partition"));
300 * final String contentType = fCompress ?
301 * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
304 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
305 OutputStream os = baseStream;
306 final String contentType = props.getProperty("contenttype");
307 if (contentType.equalsIgnoreCase("application/json")) {
308 JSONArray jsonArray = parseJSON();
309 os.write(jsonArray.toString().getBytes());
312 } else if (contentType.equalsIgnoreCase("text/plain")) {
313 for (TimestampedMessage m : fPending) {
314 os.write(m.fMsg.getBytes());
318 } else if (contentType.equalsIgnoreCase("application/cambria")
319 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
320 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
321 os = new GZIPOutputStream(baseStream);
323 for (TimestampedMessage m : fPending) {
325 os.write(("" + m.fPartition.length()).getBytes());
327 os.write(("" + m.fMsg.length()).getBytes());
329 os.write(m.fPartition.getBytes());
330 os.write(m.fMsg.getBytes());
335 for (TimestampedMessage m : fPending) {
336 os.write(m.fMsg.getBytes());
342 final long startMs = Clock.now();
343 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
348 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
349 + (nowMs - fPending.peek().timestamp) + " ms");
350 sender.setPayload(os.toString());
351 String dmeResponse = sender.sendAndWait(5000L);
353 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
354 getLog().info(logLine);
359 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
360 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
361 + (nowMs - fPending.peek().timestamp) + " ms");
362 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
363 username, password, protocolFlag);
364 // Here we are checking for error response. If HTTP status
365 // code is not within the http success response code
366 // then we consider this as error and return false
367 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
370 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
371 getLog().info(logLine);
376 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
377 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
378 + (nowMs - fPending.peek().timestamp) + " ms");
379 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
382 // Here we are checking for error response. If HTTP status
383 // code is not within the http success response code
384 // then we consider this as error and return false
385 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
388 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
389 getLog().info(logLine);
394 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
395 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
396 + (nowMs - fPending.peek().timestamp) + " ms");
397 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
399 // Here we are checking for error response. If HTTP status
400 // code is not within the http success response code
401 // then we consider this as error and return false
402 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
405 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
406 getLog().info(logLine);
410 } catch (IllegalArgumentException x) {
411 getLog().warn(x.getMessage(), x);
412 } catch (IOException x) {
413 getLog().warn(x.getMessage(), x);
414 } catch (HttpException x) {
415 getLog().warn(x.getMessage(), x);
416 } catch (Exception x) {
417 getLog().warn(x.getMessage(), x);
422 public synchronized MRPublisherResponse sendBatchWithResponse() {
423 // it's possible for this call to be made with an empty list. in this
424 // case, just return.
425 if (fPending.size() < 1) {
426 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
427 pubResponse.setResponseMessage("No Messages to send");
431 final long nowMs = Clock.now();
433 host = this.fHostSelector.selectBaseHost();
435 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
436 props.getProperty("partition"));
437 OutputStream os = null;
440 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
442 final String contentType = props.getProperty("contenttype");
443 if (contentType.equalsIgnoreCase("application/json")) {
444 JSONArray jsonArray = parseJSON();
445 os.write(jsonArray.toString().getBytes());
446 } else if (contentType.equalsIgnoreCase("text/plain")) {
447 for (TimestampedMessage m : fPending) {
448 os.write(m.fMsg.getBytes());
451 } else if (contentType.equalsIgnoreCase("application/cambria")
452 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
453 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
454 os = new GZIPOutputStream(baseStream);
456 for (TimestampedMessage m : fPending) {
458 os.write(("" + m.fPartition.length()).getBytes());
460 os.write(("" + m.fMsg.length()).getBytes());
462 os.write(m.fPartition.getBytes());
463 os.write(m.fMsg.getBytes());
468 for (TimestampedMessage m : fPending) {
469 os.write(m.fMsg.getBytes());
474 final long startMs = Clock.now();
475 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
481 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
482 + (nowMs - fPending.peek().timestamp) + " ms");
483 sender.setPayload(os.toString());
485 String dmeResponse = sender.sendAndWait(5000L);
487 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
489 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
490 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
494 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
495 getLog().info(logLine);
498 } catch (DME2Exception x) {
499 getLog().warn(x.getMessage(), x);
500 pubResponse.setResponseCode(x.getErrorCode());
501 pubResponse.setResponseMessage(x.getErrorMessage());
502 } catch (URISyntaxException x) {
504 getLog().warn(x.getMessage(), x);
505 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
506 pubResponse.setResponseMessage(x.getMessage());
507 } catch (Exception x) {
509 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
510 pubResponse.setResponseMessage(x.getMessage());
511 logger.error("exception: ", x);
518 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
519 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
520 + (nowMs - fPending.peek().timestamp) + " ms");
521 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
522 authDate, username, password, protocolFlag);
523 // Here we are checking for error response. If HTTP status
524 // code is not within the http success response code
525 // then we consider this as error and return false
527 pubResponse = createMRPublisherResponse(result, pubResponse);
529 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
530 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
535 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
536 getLog().info(logLine);
541 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
542 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
543 + (nowMs - fPending.peek().timestamp) + " ms");
544 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
545 password, protocolFlag);
547 // Here we are checking for error response. If HTTP status
548 // code is not within the http success response code
549 // then we consider this as error and return false
550 pubResponse = createMRPublisherResponse(result, pubResponse);
552 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
553 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
558 final String logLine = String.valueOf((Clock.now() - startMs));
559 getLog().info(logLine);
564 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
565 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
566 + (nowMs - fPending.peek().timestamp) + " ms");
567 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
569 // Here we are checking for error response. If HTTP status
570 // code is not within the http success response code
571 // then we consider this as error and return false
572 pubResponse = createMRPublisherResponse(result, pubResponse);
574 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
575 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
580 final String logLine = String.valueOf((Clock.now() - startMs));
581 getLog().info(logLine);
585 } catch (IllegalArgumentException x) {
586 getLog().warn(x.getMessage(), x);
587 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
588 pubResponse.setResponseMessage(x.getMessage());
590 } catch (IOException x) {
591 getLog().warn(x.getMessage(), x);
592 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
593 pubResponse.setResponseMessage(x.getMessage());
595 } catch (HttpException x) {
596 getLog().warn(x.getMessage(), x);
597 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
598 pubResponse.setResponseMessage(x.getMessage());
600 } catch (Exception x) {
601 getLog().warn(x.getMessage(), x);
603 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
604 pubResponse.setResponseMessage(x.getMessage());
609 if (fPending.size() > 0) {
610 getLog().warn("Send failed, " + fPending.size() + " message to send.");
611 pubResponse.setPendingMsgs(fPending.size());
616 } catch (Exception x) {
617 getLog().warn(x.getMessage(), x);
618 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
619 pubResponse.setResponseMessage("Error in closing Output Stream");
627 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
629 if (reply.isEmpty()) {
631 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
632 mrPubResponse.setResponseMessage("Please verify the Producer properties");
633 } else if (reply.startsWith("{")) {
634 JSONObject jObject = new JSONObject(reply);
635 if (jObject.has("message") && jObject.has("status")) {
636 String message = jObject.getString("message");
637 if (null != message) {
638 mrPubResponse.setResponseMessage(message);
640 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
642 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
643 mrPubResponse.setResponseMessage(reply);
645 } else if (reply.startsWith("<")) {
646 String responseCode = getHTTPErrorResponseCode(reply);
647 if (responseCode.contains("403")) {
648 responseCode = "403";
650 mrPubResponse.setResponseCode(responseCode);
651 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
654 return mrPubResponse;
657 private final String fTopic;
658 private final int fMaxBatchSize;
659 private final long fMaxBatchAgeMs;
660 private final boolean fCompress;
661 private int threadOccuranceTime;
662 private boolean fClosed;
663 private String username;
664 private String password;
668 private HostSelector fHostSelector = null;
670 private final LinkedBlockingQueue<TimestampedMessage> fPending;
671 private long fDontSendUntilMs;
672 private final ScheduledThreadPoolExecutor fExec;
674 private String latitude;
675 private String longitude;
676 private String version;
677 private String serviceName;
679 private String partner;
680 private String routeOffer;
681 private String subContextPath;
682 private String protocol;
683 private String methodType;
685 private String dmeuser;
686 private String dmepassword;
687 private String contentType;
688 private static final long sfWaitAfterError = 10000;
689 private HashMap<String, String> DMETimeOuts;
690 private DME2Client sender;
691 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
692 private String authKey;
693 private String authDate;
694 private String handlers;
695 private Properties props;
696 public static String routerFilePath;
697 protected static final Map<String, String> headers = new HashMap<String, String>();
698 public static MultivaluedMap<String, Object> headersMap;
700 private MRPublisherResponse pubResponse;
702 public MRPublisherResponse getPubResponse() {
706 public void setPubResponse(MRPublisherResponse pubResponse) {
707 this.pubResponse = pubResponse;
710 public static String getRouterFilePath() {
711 return routerFilePath;
714 public static void setRouterFilePath(String routerFilePath) {
715 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
718 public Properties getProps() {
722 public void setProps(Properties props) {
726 public String getProtocolFlag() {
730 public void setProtocolFlag(String protocolFlag) {
731 this.protocolFlag = protocolFlag;
734 private void DME2Configue() throws Exception {
738 * FileReader reader = new FileReader(new File (producerFilePath));
739 * Properties props = new Properties(); props.load(reader);
741 latitude = props.getProperty("Latitude");
742 longitude = props.getProperty("Longitude");
743 version = props.getProperty("Version");
744 serviceName = props.getProperty("ServiceName");
745 env = props.getProperty("Environment");
746 partner = props.getProperty("Partner");
747 routeOffer = props.getProperty("routeOffer");
748 subContextPath = props.getProperty("SubContextPath") + fTopic;
750 * if(props.getProperty("partition")!=null &&
751 * !props.getProperty("partition").equalsIgnoreCase("")){
752 * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
755 protocol = props.getProperty("Protocol");
756 methodType = props.getProperty("MethodType");
757 dmeuser = props.getProperty("username");
758 dmepassword = props.getProperty("password");
759 contentType = props.getProperty("contenttype");
760 handlers = props.getProperty("sessionstickinessrequired");
761 routerFilePath = props.getProperty("DME2preferredRouterFilePath");
764 * Changes to DME2Client url to use Partner for auto failover
765 * between data centers When Partner value is not provided use the
766 * routeOffer value for auto failover within a cluster
769 String partitionKey = props.getProperty("partition");
771 if (partner != null && !partner.isEmpty()) {
772 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
774 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
775 url = url + "&partitionKey=" + partitionKey;
777 } else if (routeOffer != null && !routeOffer.isEmpty()) {
778 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
780 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
781 url = url + "&partitionKey=" + partitionKey;
785 DMETimeOuts = new HashMap<String, String>();
786 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
787 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
788 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
789 DMETimeOuts.put("Content-Type", contentType);
790 System.setProperty("AFT_LATITUDE", latitude);
791 System.setProperty("AFT_LONGITUDE", longitude);
792 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
793 // System.setProperty("DME2.DEBUG", "true");
796 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
797 // "SSLv3,TLSv1,TLSv1.1");
798 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
799 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
800 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
804 sender = new DME2Client(new URI(url), 5000L);
806 sender.setAllowAllHttpReturnCodes(true);
807 sender.setMethod(methodType);
808 sender.setSubContext(subContextPath);
809 sender.setCredentials(dmeuser, dmepassword);
810 sender.setHeaders(DMETimeOuts);
811 if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
812 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
813 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
814 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
815 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
816 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
818 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
820 } catch (DME2Exception x) {
821 getLog().warn(x.getMessage(), x);
822 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
823 } catch (URISyntaxException x) {
825 getLog().warn(x.getMessage(), x);
826 throw new URISyntaxException(url, x.getMessage());
827 } catch (Exception x) {
829 getLog().warn(x.getMessage(), x);
830 throw new IllegalArgumentException(x.getMessage());
834 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
835 boolean compress) throws MalformedURLException {
838 if (topic == null || topic.length() < 1) {
839 throw new IllegalArgumentException("A topic must be provided.");
842 fHostSelector = new HostSelector(hosts, null);
845 fMaxBatchSize = maxBatchSize;
846 fMaxBatchAgeMs = maxBatchAgeMs;
847 fCompress = compress;
849 fPending = new LinkedBlockingQueue<TimestampedMessage>();
850 fDontSendUntilMs = 0;
851 fExec = new ScheduledThreadPoolExecutor(1);
852 pubResponse = new MRPublisherResponse();
856 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
857 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
860 if (topic == null || topic.length() < 1) {
861 throw new IllegalArgumentException("A topic must be provided.");
864 fHostSelector = new HostSelector(hosts, null);
867 fMaxBatchSize = maxBatchSize;
868 fMaxBatchAgeMs = maxBatchAgeMs;
869 fCompress = compress;
870 threadOccuranceTime = httpThreadOccurnace;
871 fPending = new LinkedBlockingQueue<TimestampedMessage>();
872 fDontSendUntilMs = 0;
873 fExec = new ScheduledThreadPoolExecutor(1);
874 fExec.scheduleAtFixedRate(new Runnable() {
879 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
882 private static class TimestampedMessage extends message {
883 public TimestampedMessage(message m) {
885 timestamp = Clock.now();
888 public final long timestamp;
891 public String getUsername() {
895 public void setUsername(String username) {
896 this.username = username;
899 public String getPassword() {
903 public void setPassword(String password) {
904 this.password = password;
907 public String getHost() {
911 public void setHost(String host) {
915 public String getContentType() {
919 public void setContentType(String contentType) {
920 this.contentType = contentType;
923 public String getAuthKey() {
927 public void setAuthKey(String authKey) {
928 this.authKey = authKey;
931 public String getAuthDate() {
935 public void setAuthDate(String authDate) {
936 this.authDate = authDate;