1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import org.onap.dmaap.mr.client.HostSelector;
55 import org.onap.dmaap.mr.client.MRBatchingPublisher;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 public static class Builder {
66 public Builder againstUrls(Collection<String> baseUrls) {
71 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
74 fServiceName = serviceName;
75 fTransportype = transportype;
79 public Builder onTopic(String topic) {
84 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
85 fMaxBatchSize = maxBatchSize;
86 fMaxBatchAgeMs = maxBatchAgeMs;
90 public Builder compress(boolean compress) {
95 public Builder httpThreadTime(int threadOccuranceTime) {
96 this.threadOccuranceTime = threadOccuranceTime;
100 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
101 fAllowSelfSignedCerts = allowSelfSignedCerts;
105 public Builder withResponse(boolean withResponse) {
106 fWithResponse = withResponse;
110 public MRSimplerBatchPublisher build() {
111 if (!fWithResponse) {
113 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
114 fAllowSelfSignedCerts, threadOccuranceTime);
115 } catch (MalformedURLException e) {
116 throw new IllegalArgumentException(e);
120 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
121 fAllowSelfSignedCerts, fMaxBatchSize);
122 } catch (MalformedURLException e) {
123 throw new IllegalArgumentException(e);
129 private Collection<String> fUrls;
130 private Collection<String> fServiceName;
131 private String fTransportype;
132 private String fTopic;
133 private int fMaxBatchSize = 100;
135 private long fMaxBatchAgeMs = 1000;
136 private boolean fCompress = false;
137 private int threadOccuranceTime = 50;
138 private boolean fAllowSelfSignedCerts = false;
139 private boolean fWithResponse = false;
144 public int send(String partition, String msg) {
145 return send(new message(partition, msg));
149 public int send(String msg) {
150 return send(new message(null, msg));
154 public int send(message msg) {
155 final LinkedList<message> list = new LinkedList<>();
161 public synchronized int send(Collection<message> msgs) {
163 throw new IllegalStateException("The publisher was closed.");
166 for (message userMsg : msgs) {
167 fPending.add(new TimestampedMessage(userMsg));
169 return getPendingMessageCount();
173 public synchronized int getPendingMessageCount() {
174 return fPending.size();
178 public void close() {
180 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
181 if (remains.isEmpty()) {
182 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
185 } catch (InterruptedException e) {
186 getLog().warn("Possible message loss. " + e.getMessage(), e);
187 Thread.currentThread().interrupt();
188 } catch (IOException e) {
189 getLog().warn("Possible message loss. " + e.getMessage(), e);
194 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
195 synchronized (this) {
198 // stop the background sender
199 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
200 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
204 final long now = Clock.now();
205 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
206 final long timeoutAtMs = now + waitInMs;
208 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
213 synchronized (this) {
214 final LinkedList<message> result = new LinkedList<>();
215 fPending.drainTo(result);
221 * Possibly send a batch to the MR server. This is called by the background
222 * thread and the close() method
226 private synchronized void send(boolean force) {
227 if ((force || shouldSendNow()) && !sendBatch()) {
228 getLog().warn("Send failed, " + fPending.size() + " message to send.");
230 // note the time for back-off
231 fDontSendUntilMs = sfWaitAfterError + Clock.now();
235 private synchronized boolean shouldSendNow() {
236 boolean shouldSend = false;
237 if (fPending.size()>0) {
238 final long nowMs = Clock.now();
240 shouldSend = (fPending.size() >= fMaxBatchSize);
242 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
243 shouldSend = sendAtMs <= nowMs;
246 // however, wait after an error
247 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
253 * Method to parse published JSON Objects and Arrays
257 private JSONArray parseJSON() {
258 JSONArray jsonArray = new JSONArray();
259 for (TimestampedMessage m : fPending) {
260 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
261 JSONObject jsonObject = null;
262 JSONArray tempjsonArray = null;
263 final char firstChar = jsonTokener.next();
265 if ('[' == firstChar) {
266 tempjsonArray = new JSONArray(jsonTokener);
267 if (null != tempjsonArray) {
268 for (int i = 0; i < tempjsonArray.length(); i++) {
269 jsonArray.put(tempjsonArray.getJSONObject(i));
273 jsonObject = new JSONObject(jsonTokener);
274 jsonArray.put(jsonObject);
281 private synchronized boolean sendBatch() {
282 // it's possible for this call to be made with an empty list. in this
283 // case, just return.
284 if (fPending.isEmpty()) {
288 final long nowMs = Clock.now();
290 if (this.fHostSelector != null) {
291 host = this.fHostSelector.selectBaseHost();
294 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
295 props.getProperty("partition"));
299 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
300 OutputStream os = baseStream;
301 final String contentType = props.getProperty("contenttype");
302 if (contentType.equalsIgnoreCase("application/json")) {
303 JSONArray jsonArray = parseJSON();
304 os.write(jsonArray.toString().getBytes());
307 } else if (contentType.equalsIgnoreCase("text/plain")) {
308 for (TimestampedMessage m : fPending) {
309 os.write(m.fMsg.getBytes());
313 } else if (contentType.equalsIgnoreCase("application/cambria")
314 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
315 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
316 os = new GZIPOutputStream(baseStream);
318 for (TimestampedMessage m : fPending) {
320 os.write(("" + m.fPartition.length()).getBytes());
322 os.write(("" + m.fMsg.length()).getBytes());
324 os.write(m.fPartition.getBytes());
325 os.write(m.fMsg.getBytes());
330 for (TimestampedMessage m : fPending) {
331 os.write(m.fMsg.getBytes());
337 final long startMs = Clock.now();
338 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
344 .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
345 nowMs - fPending.peek().timestamp));
346 sender.setPayload(os.toString());
347 String dmeResponse = sender.sendAndWait(5000L);
349 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
350 getLog().info(logLine);
355 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
356 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
357 nowMs - fPending.peek().timestamp);
358 final JSONObject result =
359 postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
360 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
361 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
362 // Here we are checking for error response. If HTTP status
363 // code is not within the http success response code
364 // then we consider this as error and return false
365 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
368 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
369 getLog().info(logLine);
374 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
375 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
376 nowMs - fPending.peek().timestamp);
377 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
380 // Here we are checking for error response. If HTTP status
381 // code is not within the http success response code
382 // then we consider this as error and return false
383 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
386 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
387 getLog().info(logLine);
392 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
393 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
394 nowMs - fPending.peek().timestamp);
395 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
397 // Here we are checking for error response. If HTTP status
398 // code is not within the http success response code
399 // then we consider this as error and return false
400 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
403 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
404 getLog().info(logLine);
408 } catch (Exception x) {
409 getLog().warn(x.getMessage(), x);
414 public synchronized MRPublisherResponse sendBatchWithResponse() {
415 // it's possible for this call to be made with an empty list. in this
416 // case, just return.
417 if (fPending.isEmpty()) {
418 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
419 pubResponse.setResponseMessage("No Messages to send");
423 final long nowMs = Clock.now();
425 host = this.fHostSelector.selectBaseHost();
427 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
428 props.getProperty("partition"));
429 OutputStream os = null;
432 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
434 final String contentType = props.getProperty("contenttype");
435 if (contentType.equalsIgnoreCase("application/json")) {
436 JSONArray jsonArray = parseJSON();
437 os.write(jsonArray.toString().getBytes());
438 } else if (contentType.equalsIgnoreCase("text/plain")) {
439 for (TimestampedMessage m : fPending) {
440 os.write(m.fMsg.getBytes());
443 } else if (contentType.equalsIgnoreCase("application/cambria")
444 || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
445 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
446 os = new GZIPOutputStream(baseStream);
448 for (TimestampedMessage m : fPending) {
450 os.write(("" + m.fPartition.length()).getBytes());
452 os.write(("" + m.fMsg.length()).getBytes());
454 os.write(m.fPartition.getBytes());
455 os.write(m.fMsg.getBytes());
460 for (TimestampedMessage m : fPending) {
461 os.write(m.fMsg.getBytes());
466 final long startMs = Clock.now();
467 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
473 getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
474 nowMs - fPending.peek().timestamp);
475 sender.setPayload(os.toString());
477 String dmeResponse = sender.sendAndWait(5000L);
479 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
481 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
482 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
486 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
487 getLog().info(logLine);
490 } catch (DME2Exception x) {
491 getLog().warn(x.getMessage(), x);
492 pubResponse.setResponseCode(x.getErrorCode());
493 pubResponse.setResponseMessage(x.getErrorMessage());
494 } catch (URISyntaxException x) {
496 getLog().warn(x.getMessage(), x);
497 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
498 pubResponse.setResponseMessage(x.getMessage());
499 } catch (Exception x) {
501 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
502 pubResponse.setResponseMessage(x.getMessage());
503 logger.error("exception: ", x);
510 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
511 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
512 nowMs - fPending.peek().timestamp);
513 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
514 authDate, username, password, protocolFlag);
515 // Here we are checking for error response. If HTTP status
516 // code is not within the http success response code
517 // then we consider this as error and return false
519 pubResponse = createMRPublisherResponse(result, pubResponse);
521 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
522 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
527 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
528 getLog().info(logLine);
533 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
534 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
535 nowMs - fPending.peek().timestamp);
536 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
537 password, protocolFlag);
539 // Here we are checking for error response. If HTTP status
540 // code is not within the http success response code
541 // then we consider this as error and return false
542 pubResponse = createMRPublisherResponse(result, pubResponse);
544 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
545 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
550 final String logLine = String.valueOf((Clock.now() - startMs));
551 getLog().info(logLine);
556 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
557 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
558 nowMs - fPending.peek().timestamp);
559 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
561 // Here we are checking for error response. If HTTP status
562 // code is not within the http success response code
563 // then we consider this as error and return false
564 pubResponse = createMRPublisherResponse(result, pubResponse);
566 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
567 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
572 final String logLine = String.valueOf((Clock.now() - startMs));
573 getLog().info(logLine);
577 } catch (IllegalArgumentException | HttpException x) {
578 getLog().warn(x.getMessage(), x);
579 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
580 pubResponse.setResponseMessage(x.getMessage());
582 } catch (IOException x) {
583 getLog().warn(x.getMessage(), x);
584 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
585 pubResponse.setResponseMessage(x.getMessage());
586 } catch (Exception x) {
587 getLog().warn(x.getMessage(), x);
589 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
590 pubResponse.setResponseMessage(x.getMessage());
595 if (!fPending.isEmpty()) {
596 getLog().warn("Send failed, " + fPending.size() + " message to send.");
597 pubResponse.setPendingMsgs(fPending.size());
602 } catch (Exception x) {
603 getLog().warn(x.getMessage(), x);
604 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
605 pubResponse.setResponseMessage("Error in closing Output Stream");
613 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
615 if (reply.isEmpty()) {
617 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
618 mrPubResponse.setResponseMessage("Please verify the Producer properties");
619 } else if (reply.startsWith("{")) {
620 JSONObject jObject = new JSONObject(reply);
621 if (jObject.has("message") && jObject.has("status")) {
622 String message = jObject.getString("message");
623 if (null != message) {
624 mrPubResponse.setResponseMessage(message);
626 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
628 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
629 mrPubResponse.setResponseMessage(reply);
631 } else if (reply.startsWith("<")) {
632 String responseCode = getHTTPErrorResponseCode(reply);
633 if (responseCode.contains("403")) {
634 responseCode = "403";
636 mrPubResponse.setResponseCode(responseCode);
637 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
640 return mrPubResponse;
643 private final String fTopic;
644 private final int fMaxBatchSize;
645 private final long fMaxBatchAgeMs;
646 private final boolean fCompress;
647 private int threadOccuranceTime;
648 private boolean fClosed;
649 private String username;
650 private String password;
654 private HostSelector fHostSelector = null;
656 private final LinkedBlockingQueue<TimestampedMessage> fPending;
657 private long fDontSendUntilMs;
658 private final ScheduledThreadPoolExecutor fExec;
660 private String latitude;
661 private String longitude;
662 private String version;
663 private String serviceName;
665 private String partner;
666 private String routeOffer;
667 private String subContextPath;
668 private String protocol;
669 private String methodType;
671 private String dmeuser;
672 private String dmepassword;
673 private String contentType;
674 private static final long sfWaitAfterError = 10000;
675 private HashMap<String, String> DMETimeOuts;
676 private DME2Client sender;
677 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
678 private String authKey;
679 private String authDate;
680 private String handlers;
681 private Properties props;
682 public static String routerFilePath;
683 protected static final Map<String, String> headers = new HashMap<String, String>();
684 public static MultivaluedMap<String, Object> headersMap;
686 private MRPublisherResponse pubResponse;
688 public MRPublisherResponse getPubResponse() {
692 public void setPubResponse(MRPublisherResponse pubResponse) {
693 this.pubResponse = pubResponse;
696 public static String getRouterFilePath() {
697 return routerFilePath;
700 public static void setRouterFilePath(String routerFilePath) {
701 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
704 public Properties getProps() {
708 public void setProps(Properties props) {
710 setClientConfig(DmaapClientUtil.getClientConfig(props));
713 public String getProtocolFlag() {
717 public void setProtocolFlag(String protocolFlag) {
718 this.protocolFlag = protocolFlag;
721 private void DME2Configue() throws Exception {
724 latitude = props.getProperty("Latitude");
725 longitude = props.getProperty("Longitude");
726 version = props.getProperty("Version");
727 serviceName = props.getProperty("ServiceName");
728 env = props.getProperty("Environment");
729 partner = props.getProperty("Partner");
730 routeOffer = props.getProperty("routeOffer");
731 subContextPath = props.getProperty("SubContextPath") + fTopic;
733 protocol = props.getProperty("Protocol");
734 methodType = props.getProperty("MethodType");
735 dmeuser = props.getProperty("username");
736 dmepassword = props.getProperty("password");
737 contentType = props.getProperty("contenttype");
738 handlers = props.getProperty("sessionstickinessrequired");
739 routerFilePath = props.getProperty("DME2preferredRouterFilePath");
742 * Changes to DME2Client url to use Partner for auto failover
743 * between data centers When Partner value is not provided use the
744 * routeOffer value for auto failover within a cluster
747 String partitionKey = props.getProperty("partition");
749 if (partner != null && !partner.isEmpty()) {
750 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
752 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
753 url = url + "&partitionKey=" + partitionKey;
755 } else if (routeOffer != null && !routeOffer.isEmpty()) {
756 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
758 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
759 url = url + "&partitionKey=" + partitionKey;
763 DMETimeOuts = new HashMap<String, String>();
764 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
765 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
766 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
767 DMETimeOuts.put("Content-Type", contentType);
768 System.setProperty("AFT_LATITUDE", latitude);
769 System.setProperty("AFT_LONGITUDE", longitude);
770 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
771 // System.setProperty("DME2.DEBUG", "true");
774 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
776 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
777 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
778 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
782 sender = new DME2Client(new URI(url), 5000L);
784 sender.setAllowAllHttpReturnCodes(true);
785 sender.setMethod(methodType);
786 sender.setSubContext(subContextPath);
787 sender.setCredentials(dmeuser, dmepassword);
788 sender.setHeaders(DMETimeOuts);
789 if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
790 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
791 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
792 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
793 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
794 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
796 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
798 } catch (DME2Exception x) {
799 getLog().warn(x.getMessage(), x);
800 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
801 } catch (URISyntaxException x) {
803 getLog().warn(x.getMessage(), x);
804 throw new URISyntaxException(url, x.getMessage());
805 } catch (Exception x) {
807 getLog().warn(x.getMessage(), x);
808 throw new IllegalArgumentException(x.getMessage());
812 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
813 boolean compress) throws MalformedURLException {
816 if (topic == null || topic.length() < 1) {
817 throw new IllegalArgumentException("A topic must be provided.");
820 fHostSelector = new HostSelector(hosts, null);
823 fMaxBatchSize = maxBatchSize;
824 fMaxBatchAgeMs = maxBatchAgeMs;
825 fCompress = compress;
827 fPending = new LinkedBlockingQueue<>();
828 fDontSendUntilMs = 0;
829 fExec = new ScheduledThreadPoolExecutor(1);
830 pubResponse = new MRPublisherResponse();
834 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
835 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
838 if (topic == null || topic.length() < 1) {
839 throw new IllegalArgumentException("A topic must be provided.");
842 fHostSelector = new HostSelector(hosts, null);
845 fMaxBatchSize = maxBatchSize;
846 fMaxBatchAgeMs = maxBatchAgeMs;
847 fCompress = compress;
848 threadOccuranceTime = httpThreadOccurnace;
849 fPending = new LinkedBlockingQueue<>();
850 fDontSendUntilMs = 0;
851 fExec = new ScheduledThreadPoolExecutor(1);
852 fExec.scheduleAtFixedRate(new Runnable() {
857 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
858 pubResponse = new MRPublisherResponse();
861 private static class TimestampedMessage extends message {
862 public TimestampedMessage(message m) {
864 timestamp = Clock.now();
867 public final long timestamp;
870 public String getUsername() {
874 public void setUsername(String username) {
875 this.username = username;
878 public String getPassword() {
882 public void setPassword(String password) {
883 this.password = password;
886 public String getHost() {
890 public void setHost(String host) {
894 public String getContentType() {
898 public void setContentType(String contentType) {
899 this.contentType = contentType;
902 public String getAuthKey() {
906 public void setAuthKey(String authKey) {
907 this.authKey = authKey;
910 public String getAuthDate() {
914 public void setAuthDate(String authDate) {
915 this.authDate = authDate;