1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import org.onap.dmaap.mr.client.HostSelector;
55 import org.onap.dmaap.mr.client.MRBatchingPublisher;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 private static final String PROPS_PROTOCOL = "Protocol";
63 private static final String PROPS_PARTITION = "partition";
64 private static final String PROPS_CONTENT_TYPE = "contenttype";
66 private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
67 private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
68 private static final String CONTENT_TYPE_JSON = "application/json";
69 private static final String CONTENT_TYPE_TEXT = "text/plain";
71 private static final String JSON_STATUS = "status";
73 public static class Builder {
75 public Builder againstUrls(Collection<String> baseUrls) {
80 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )
83 fServiceName = serviceName;
84 fTransportype = transportype;
88 public Builder onTopic(String topic) {
93 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
94 fMaxBatchSize = maxBatchSize;
95 fMaxBatchAgeMs = maxBatchAgeMs;
99 public Builder compress(boolean compress) {
100 fCompress = compress;
104 public Builder httpThreadTime(int threadOccuranceTime) {
105 this.threadOccuranceTime = threadOccuranceTime;
109 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
110 fAllowSelfSignedCerts = allowSelfSignedCerts;
114 public Builder withResponse(boolean withResponse) {
115 fWithResponse = withResponse;
119 public MRSimplerBatchPublisher build() {
120 if (!fWithResponse) {
122 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
123 fAllowSelfSignedCerts, threadOccuranceTime);
124 } catch (MalformedURLException e) {
125 throw new IllegalArgumentException(e);
129 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
130 fAllowSelfSignedCerts, fMaxBatchSize);
131 } catch (MalformedURLException e) {
132 throw new IllegalArgumentException(e);
138 private Collection<String> fUrls;
139 private Collection<String> fServiceName;
140 private String fTransportype;
141 private String fTopic;
142 private int fMaxBatchSize = 100;
144 private long fMaxBatchAgeMs = 1000;
145 private boolean fCompress = false;
146 private int threadOccuranceTime = 50;
147 private boolean fAllowSelfSignedCerts = false;
148 private boolean fWithResponse = false;
153 public int send(String partition, String msg) {
154 return send(new message(partition, msg));
158 public int send(String msg) {
159 return send(new message(null, msg));
163 public int send(message msg) {
164 final LinkedList<message> list = new LinkedList<>();
170 public synchronized int send(Collection<message> msgs) {
172 throw new IllegalStateException("The publisher was closed.");
175 for (message userMsg : msgs) {
176 fPending.add(new TimestampedMessage(userMsg));
178 return getPendingMessageCount();
182 public synchronized int getPendingMessageCount() {
183 return fPending.size();
187 public void close() {
189 final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
190 if (remains.isEmpty()) {
191 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
194 } catch (InterruptedException e) {
195 getLog().warn("Possible message loss. " + e.getMessage(), e);
196 Thread.currentThread().interrupt();
197 } catch (IOException e) {
198 getLog().warn("Possible message loss. " + e.getMessage(), e);
203 public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
204 synchronized (this) {
207 // stop the background sender
208 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
209 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
213 final long now = Clock.now();
214 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
215 final long timeoutAtMs = now + waitInMs;
217 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
222 synchronized (this) {
223 final LinkedList<message> result = new LinkedList<>();
224 fPending.drainTo(result);
230 * Possibly send a batch to the MR server. This is called by the background
231 * thread and the close() method
235 private synchronized void send(boolean force) {
236 if ((force || shouldSendNow()) && !sendBatch()) {
237 getLog().warn("Send failed, " + fPending.size() + " message to send.");
239 // note the time for back-off
240 fDontSendUntilMs = sfWaitAfterError + Clock.now();
244 private synchronized boolean shouldSendNow() {
245 boolean shouldSend = false;
246 if (fPending.size()>0) {
247 final long nowMs = Clock.now();
249 shouldSend = (fPending.size() >= fMaxBatchSize);
251 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
252 shouldSend = sendAtMs <= nowMs;
255 // however, wait after an error
256 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
262 * Method to parse published JSON Objects and Arrays
266 private JSONArray parseJSON() {
267 JSONArray jsonArray = new JSONArray();
268 for (TimestampedMessage m : fPending) {
269 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
270 JSONObject jsonObject = null;
271 JSONArray tempjsonArray = null;
272 final char firstChar = jsonTokener.next();
274 if ('[' == firstChar) {
275 tempjsonArray = new JSONArray(jsonTokener);
276 if (null != tempjsonArray) {
277 for (int i = 0; i < tempjsonArray.length(); i++) {
278 jsonArray.put(tempjsonArray.getJSONObject(i));
282 jsonObject = new JSONObject(jsonTokener);
283 jsonArray.put(jsonObject);
290 private void logTime(long startMs, String dmeResponse) {
291 if (getLog().isInfoEnabled()) {
292 getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
296 private synchronized boolean sendBatch() {
297 // it's possible for this call to be made with an empty list. in this
298 // case, just return.
299 if (fPending.isEmpty()) {
303 final long nowMs = Clock.now();
305 if (this.fHostSelector != null) {
306 host = this.fHostSelector.selectBaseHost();
309 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
310 props.getProperty(PROPS_PARTITION));
314 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
315 OutputStream os = baseStream;
316 final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
317 if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
318 JSONArray jsonArray = parseJSON();
319 os.write(jsonArray.toString().getBytes());
322 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
323 for (TimestampedMessage m : fPending) {
324 os.write(m.fMsg.getBytes());
328 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
329 || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
330 if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
331 os = new GZIPOutputStream(baseStream);
333 for (TimestampedMessage m : fPending) {
335 os.write(("" + m.fPartition.length()).getBytes());
337 os.write(("" + m.fMsg.length()).getBytes());
339 os.write(m.fPartition.getBytes());
340 os.write(m.fMsg.getBytes());
345 for (TimestampedMessage m : fPending) {
346 os.write(m.fMsg.getBytes());
352 final long startMs = Clock.now();
353 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
359 .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
360 nowMs - fPending.peek().timestamp));
361 sender.setPayload(os.toString());
362 String dmeResponse = sender.sendAndWait(5000L);
364 logTime(startMs, dmeResponse);
369 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
370 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
371 nowMs - fPending.peek().timestamp);
372 final JSONObject result =
373 postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
374 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
375 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
376 // Here we are checking for error response. If HTTP status
377 // code is not within the http success response code
378 // then we consider this as error and return false
379 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
382 logTime(startMs, result.toString());
387 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
388 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
389 nowMs - fPending.peek().timestamp);
390 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
393 // Here we are checking for error response. If HTTP status
394 // code is not within the http success response code
395 // then we consider this as error and return false
396 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
399 logTime(startMs, result.toString());
404 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
405 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
406 nowMs - fPending.peek().timestamp);
407 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
409 // Here we are checking for error response. If HTTP status
410 // code is not within the http success response code
411 // then we consider this as error and return false
412 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
415 logTime(startMs, result.toString());
419 } catch (Exception x) {
420 getLog().warn(x.getMessage(), x);
425 public synchronized MRPublisherResponse sendBatchWithResponse() {
426 // it's possible for this call to be made with an empty list. in this
427 // case, just return.
428 if (fPending.isEmpty()) {
429 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
430 pubResponse.setResponseMessage("No Messages to send");
434 final long nowMs = Clock.now();
436 host = this.fHostSelector.selectBaseHost();
438 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
439 props.getProperty(PROPS_PARTITION));
440 OutputStream os = null;
443 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
445 final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
446 if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
447 JSONArray jsonArray = parseJSON();
448 os.write(jsonArray.toString().getBytes());
449 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
450 for (TimestampedMessage m : fPending) {
451 os.write(m.fMsg.getBytes());
454 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
455 || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
456 if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
457 os = new GZIPOutputStream(baseStream);
459 for (TimestampedMessage m : fPending) {
461 os.write(("" + m.fPartition.length()).getBytes());
463 os.write(("" + m.fMsg.length()).getBytes());
465 os.write(m.fPartition.getBytes());
466 os.write(m.fMsg.getBytes());
471 for (TimestampedMessage m : fPending) {
472 os.write(m.fMsg.getBytes());
477 final long startMs = Clock.now();
478 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
484 getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
485 nowMs - fPending.peek().timestamp);
486 sender.setPayload(os.toString());
488 String dmeResponse = sender.sendAndWait(5000L);
490 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
492 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
493 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
497 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
498 getLog().info(logLine);
501 } catch (DME2Exception x) {
502 getLog().warn(x.getMessage(), x);
503 pubResponse.setResponseCode(x.getErrorCode());
504 pubResponse.setResponseMessage(x.getErrorMessage());
505 } catch (URISyntaxException x) {
507 getLog().warn(x.getMessage(), x);
508 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
509 pubResponse.setResponseMessage(x.getMessage());
510 } catch (Exception x) {
512 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
513 pubResponse.setResponseMessage(x.getMessage());
514 logger.error("exception: ", x);
521 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
522 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
523 nowMs - fPending.peek().timestamp);
524 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
525 authDate, username, password, protocolFlag);
526 // Here we are checking for error response. If HTTP status
527 // code is not within the http success response code
528 // then we consider this as error and return false
530 pubResponse = createMRPublisherResponse(result, pubResponse);
532 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
533 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
538 logTime(startMs, result);
543 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
544 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
545 nowMs - fPending.peek().timestamp);
546 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
547 password, protocolFlag);
549 // Here we are checking for error response. If HTTP status
550 // code is not within the http success response code
551 // then we consider this as error and return false
552 pubResponse = createMRPublisherResponse(result, pubResponse);
554 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
555 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
560 final String logLine = String.valueOf((Clock.now() - startMs));
561 getLog().info(logLine);
566 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
567 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
568 nowMs - fPending.peek().timestamp);
569 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
571 // Here we are checking for error response. If HTTP status
572 // code is not within the http success response code
573 // then we consider this as error and return false
574 pubResponse = createMRPublisherResponse(result, pubResponse);
576 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
577 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
582 final String logLine = String.valueOf((Clock.now() - startMs));
583 getLog().info(logLine);
587 } catch (IllegalArgumentException | HttpException x) {
588 getLog().warn(x.getMessage(), x);
589 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
590 pubResponse.setResponseMessage(x.getMessage());
592 } catch (IOException x) {
593 getLog().warn(x.getMessage(), x);
594 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
595 pubResponse.setResponseMessage(x.getMessage());
596 } catch (Exception x) {
597 getLog().warn(x.getMessage(), x);
599 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
600 pubResponse.setResponseMessage(x.getMessage());
605 if (!fPending.isEmpty()) {
606 getLog().warn("Send failed, " + fPending.size() + " message to send.");
607 pubResponse.setPendingMsgs(fPending.size());
612 } catch (Exception x) {
613 getLog().warn(x.getMessage(), x);
614 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
615 pubResponse.setResponseMessage("Error in closing Output Stream");
623 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
625 if (reply.isEmpty()) {
627 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
628 mrPubResponse.setResponseMessage("Please verify the Producer properties");
629 } else if (reply.startsWith("{")) {
630 JSONObject jObject = new JSONObject(reply);
631 if (jObject.has("message") && jObject.has(JSON_STATUS)) {
632 String message = jObject.getString("message");
633 if (null != message) {
634 mrPubResponse.setResponseMessage(message);
636 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
638 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
639 mrPubResponse.setResponseMessage(reply);
641 } else if (reply.startsWith("<")) {
642 String responseCode = getHTTPErrorResponseCode(reply);
643 if (responseCode.contains("403")) {
644 responseCode = "403";
646 mrPubResponse.setResponseCode(responseCode);
647 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
650 return mrPubResponse;
653 private final String fTopic;
654 private final int fMaxBatchSize;
655 private final long fMaxBatchAgeMs;
656 private final boolean fCompress;
657 private int threadOccuranceTime;
658 private boolean fClosed;
659 private String username;
660 private String password;
664 private HostSelector fHostSelector = null;
666 private final LinkedBlockingQueue<TimestampedMessage> fPending;
667 private long fDontSendUntilMs;
668 private final ScheduledThreadPoolExecutor fExec;
670 private String latitude;
671 private String longitude;
672 private String version;
673 private String serviceName;
675 private String partner;
676 private String routeOffer;
677 private String subContextPath;
678 private String protocol;
679 private String methodType;
681 private String dmeuser;
682 private String dmepassword;
683 private String contentType;
684 private static final long sfWaitAfterError = 10000;
685 private HashMap<String, String> DMETimeOuts;
686 private DME2Client sender;
687 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
688 private String authKey;
689 private String authDate;
690 private String handlers;
691 private Properties props;
692 public static String routerFilePath;
693 protected static final Map<String, String> headers = new HashMap<String, String>();
694 public static MultivaluedMap<String, Object> headersMap;
696 private MRPublisherResponse pubResponse;
698 public MRPublisherResponse getPubResponse() {
702 public void setPubResponse(MRPublisherResponse pubResponse) {
703 this.pubResponse = pubResponse;
706 public static String getRouterFilePath() {
707 return routerFilePath;
710 public static void setRouterFilePath(String routerFilePath) {
711 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
714 public Properties getProps() {
718 public void setProps(Properties props) {
720 setClientConfig(DmaapClientUtil.getClientConfig(props));
723 public String getProtocolFlag() {
727 public void setProtocolFlag(String protocolFlag) {
728 this.protocolFlag = protocolFlag;
731 private void DME2Configue() throws Exception {
734 latitude = props.getProperty("Latitude");
735 longitude = props.getProperty("Longitude");
736 version = props.getProperty("Version");
737 serviceName = props.getProperty("ServiceName");
738 env = props.getProperty("Environment");
739 partner = props.getProperty("Partner");
740 routeOffer = props.getProperty("routeOffer");
741 subContextPath = props.getProperty("SubContextPath") + fTopic;
743 protocol = props.getProperty(PROPS_PROTOCOL);
744 methodType = props.getProperty("MethodType");
745 dmeuser = props.getProperty("username");
746 dmepassword = props.getProperty("password");
747 contentType = props.getProperty(PROPS_CONTENT_TYPE);
748 handlers = props.getProperty("sessionstickinessrequired");
749 routerFilePath = props.getProperty("DME2preferredRouterFilePath");
752 * Changes to DME2Client url to use Partner for auto failover
753 * between data centers When Partner value is not provided use the
754 * routeOffer value for auto failover within a cluster
757 String partitionKey = props.getProperty(PROPS_PARTITION);
759 if (partner != null && !partner.isEmpty()) {
760 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
762 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
763 url = url + "&partitionKey=" + partitionKey;
765 } else if (routeOffer != null && !routeOffer.isEmpty()) {
766 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
768 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
769 url = url + "&partitionKey=" + partitionKey;
773 DMETimeOuts = new HashMap<String, String>();
774 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
775 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
776 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
777 DMETimeOuts.put("Content-Type", contentType);
778 System.setProperty("AFT_LATITUDE", latitude);
779 System.setProperty("AFT_LONGITUDE", longitude);
780 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
781 // System.setProperty("DME2.DEBUG", "true");
784 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
786 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
787 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
788 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
792 sender = new DME2Client(new URI(url), 5000L);
794 sender.setAllowAllHttpReturnCodes(true);
795 sender.setMethod(methodType);
796 sender.setSubContext(subContextPath);
797 sender.setCredentials(dmeuser, dmepassword);
798 sender.setHeaders(DMETimeOuts);
799 if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
800 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
801 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
802 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
803 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
804 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
806 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
808 } catch (DME2Exception x) {
809 getLog().warn(x.getMessage(), x);
810 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
811 } catch (URISyntaxException x) {
813 getLog().warn(x.getMessage(), x);
814 throw new URISyntaxException(url, x.getMessage());
815 } catch (Exception x) {
817 getLog().warn(x.getMessage(), x);
818 throw new IllegalArgumentException(x.getMessage());
822 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
823 boolean compress) throws MalformedURLException {
826 if (topic == null || topic.length() < 1) {
827 throw new IllegalArgumentException("A topic must be provided.");
830 fHostSelector = new HostSelector(hosts, null);
833 fMaxBatchSize = maxBatchSize;
834 fMaxBatchAgeMs = maxBatchAgeMs;
835 fCompress = compress;
837 fPending = new LinkedBlockingQueue<>();
838 fDontSendUntilMs = 0;
839 fExec = new ScheduledThreadPoolExecutor(1);
840 pubResponse = new MRPublisherResponse();
844 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
845 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
848 if (topic == null || topic.length() < 1) {
849 throw new IllegalArgumentException("A topic must be provided.");
852 fHostSelector = new HostSelector(hosts, null);
855 fMaxBatchSize = maxBatchSize;
856 fMaxBatchAgeMs = maxBatchAgeMs;
857 fCompress = compress;
858 threadOccuranceTime = httpThreadOccurnace;
859 fPending = new LinkedBlockingQueue<>();
860 fDontSendUntilMs = 0;
861 fExec = new ScheduledThreadPoolExecutor(1);
862 fExec.scheduleAtFixedRate(new Runnable() {
867 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
868 pubResponse = new MRPublisherResponse();
871 private static class TimestampedMessage extends message {
872 public TimestampedMessage(message m) {
874 timestamp = Clock.now();
877 public final long timestamp;
880 public String getUsername() {
884 public void setUsername(String username) {
885 this.username = username;
888 public String getPassword() {
892 public void setPassword(String password) {
893 this.password = password;
896 public String getHost() {
900 public void setHost(String host) {
904 public String getContentType() {
908 public void setContentType(String contentType) {
909 this.contentType = contentType;
912 public String getAuthKey() {
916 public void setAuthKey(String authKey) {
917 this.authKey = authKey;
920 public String getAuthDate() {
924 public void setAuthDate(String authDate) {
925 this.authDate = authDate;