1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2021 Orange.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
25 package org.onap.dmaap.mr.client.impl;
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
34 import java.net.URISyntaxException;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
40 import java.util.Properties;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.zip.GZIPOutputStream;
46 import javax.ws.rs.core.MultivaluedMap;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpStatus;
49 import org.json.JSONArray;
50 import org.json.JSONObject;
51 import org.json.JSONTokener;
52 import org.onap.dmaap.mr.client.HostSelector;
53 import org.onap.dmaap.mr.client.MRBatchingPublisher;
54 import org.onap.dmaap.mr.client.ProtocolType;
55 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 private static final String PASSWORD = "password";
63 private static final String USERNAME = "username";
64 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
65 private static final String SERVICE_NAME = "ServiceName";
66 private static final String PARTNER = "Partner";
67 private static final String ROUTE_OFFER = "routeOffer";
68 private static final String PROTOCOL = "Protocol";
69 private static final String METHOD_TYPE = "MethodType";
70 private static final String CONTENT_TYPE = "contenttype";
71 private static final String LATITUDE = "Latitude";
72 private static final String LONGITUDE = "Longitude";
73 private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
74 private static final String VERSION = "Version";
75 private static final String ENVIRONMENT = "Environment";
76 private static final String SUB_CONTEXT_PATH = "SubContextPath";
77 private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
78 private static final String PARTITION = "partition";
79 private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
80 private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
81 private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
82 private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
83 private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
84 private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
86 private static final String CONTENT_TYPE_TEXT = "text/plain";
88 private static final String JSON_STATUS = "status";
90 public static class Builder {
92 public Builder againstUrls(Collection<String> baseUrls) {
97 public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) {
102 public Builder onTopic(String topic) {
107 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
108 fMaxBatchSize = maxBatchSize;
109 fMaxBatchAgeMs = maxBatchAgeMs;
113 public Builder compress(boolean compress) {
114 fCompress = compress;
118 public Builder httpThreadTime(int threadOccurrenceTime) {
119 this.threadOccurrenceTime = threadOccurrenceTime;
123 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
124 fAllowSelfSignedCerts = allowSelfSignedCerts;
128 public Builder withResponse(boolean withResponse) {
129 fWithResponse = withResponse;
133 public MRSimplerBatchPublisher build() {
134 if (!fWithResponse) {
136 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
137 fAllowSelfSignedCerts, threadOccurrenceTime);
138 } catch (MalformedURLException e) {
139 throw new IllegalArgumentException(e);
143 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
144 fAllowSelfSignedCerts, fMaxBatchSize);
145 } catch (MalformedURLException e) {
146 throw new IllegalArgumentException(e);
152 private Collection<String> fUrls;
153 private String fTopic;
154 private int fMaxBatchSize = 100;
156 private long fMaxBatchAgeMs = 1000;
157 private boolean fCompress = false;
158 private int threadOccurrenceTime = 50;
159 private boolean fAllowSelfSignedCerts = false;
160 private boolean fWithResponse = false;
165 public int send(String partition, String msg) {
166 return send(new Message(partition, msg));
170 public int send(String msg) {
171 return send(new Message(null, msg));
175 public int send(Message msg) {
176 final LinkedList<Message> list = new LinkedList<>();
182 public synchronized int send(Collection<Message> msgs) {
184 throw new IllegalStateException("The publisher was closed.");
187 for (Message userMsg : msgs) {
188 fPending.add(new TimestampedMessage(userMsg));
190 return getPendingMessageCount();
194 public synchronized int getPendingMessageCount() {
195 return fPending.size();
199 public void close() {
201 final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
202 if (remains.isEmpty()) {
203 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
206 } catch (InterruptedException e) {
207 getLog().warn("Possible message loss. " + e.getMessage(), e);
208 Thread.currentThread().interrupt();
209 } catch (IOException e) {
210 getLog().warn("Possible message loss. " + e.getMessage(), e);
215 public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
216 synchronized (this) {
219 // stop the background sender
220 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
221 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
225 final long now = Clock.now();
226 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
227 final long timeoutAtMs = now + waitInMs;
229 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
234 synchronized (this) {
235 final LinkedList<Message> result = new LinkedList<>();
236 fPending.drainTo(result);
242 * Possibly send a batch to the MR server. This is called by the background
243 * thread and the close() method
247 private synchronized void send(boolean force) {
248 if ((force || shouldSendNow()) && !sendBatch()) {
249 getLog().warn("Send failed, {} message to send.", fPending.size());
250 // note the time for back-off
251 fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now();
255 private synchronized boolean shouldSendNow() {
256 boolean shouldSend = false;
257 if (!fPending.isEmpty()) {
258 final long nowMs = Clock.now();
260 shouldSend = (fPending.size() >= fMaxBatchSize);
262 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
263 shouldSend = sendAtMs <= nowMs;
266 // however, wait after an error
267 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
273 * Method to parse published JSON Objects and Arrays.
277 private JSONArray parseJSON() {
278 JSONArray jsonArray = new JSONArray();
279 for (TimestampedMessage m : fPending) {
280 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
281 JSONObject jsonObject = null;
282 JSONArray tempjsonArray = null;
283 final char firstChar = jsonTokener.next();
285 if ('[' == firstChar) {
286 tempjsonArray = new JSONArray(jsonTokener);
287 for (int i = 0; i < tempjsonArray.length(); i++) {
288 jsonArray.put(tempjsonArray.getJSONObject(i));
291 jsonObject = new JSONObject(jsonTokener);
292 jsonArray.put(jsonObject);
299 private void logTime(long startMs, String dmeResponse) {
300 if (getLog().isInfoEnabled()) {
301 getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse);
305 private void logSendMessage(int nbMessage, String dest, long time) {
306 if (getLog().isInfoEnabled()) {
307 getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time);
311 private synchronized boolean sendBatch() {
312 // it's possible for this call to be made with an empty list. in this
313 // case, just return.
314 if (fPending.isEmpty()) {
318 final long nowMs = Clock.now();
320 if (this.fHostSelector != null) {
321 host = this.fHostSelector.selectBaseHost();
324 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
325 props.getProperty(PARTITION));
329 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
330 OutputStream os = baseStream;
331 final String contentType = props.getProperty(CONTENT_TYPE);
332 if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
333 JSONArray jsonArray = parseJSON();
334 os.write(jsonArray.toString().getBytes());
337 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
338 for (TimestampedMessage m : fPending) {
339 os.write(m.fMsg.getBytes());
343 } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
344 || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
345 if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
346 os = new GZIPOutputStream(baseStream);
348 for (TimestampedMessage m : fPending) {
350 os.write(("" + m.fPartition.length()).getBytes());
352 os.write(("" + m.fMsg.length()).getBytes());
354 os.write(m.fPartition.getBytes());
355 os.write(m.fMsg.getBytes());
360 for (TimestampedMessage m : fPending) {
361 os.write(m.fMsg.getBytes());
367 final long startMs = Clock.now();
368 if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
373 if (fPending.peek() != null) {
374 logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
376 sender.setPayload(os.toString());
377 String dmeResponse = sender.sendAndWait(5000L);
379 logTime(startMs, dmeResponse);
384 if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
385 if (fPending.peek() != null) {
386 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
388 final JSONObject result =
389 postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
390 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
391 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
392 // Here we are checking for error response. If HTTP status
393 // code is not within the http success response code
394 // then we consider this as error and return false
395 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
398 logTime(startMs, result.toString());
403 if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
404 if (fPending.peek() != null) {
405 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
407 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
410 // Here we are checking for error response. If HTTP status
411 // code is not within the http success response code
412 // then we consider this as error and return false
413 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
416 logTime(startMs, result.toString());
421 if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
422 if (fPending.peek() != null) {
423 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
425 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
427 // Here we are checking for error response. If HTTP status
428 // code is not within the http success response code
429 // then we consider this as error and return false
430 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
433 logTime(startMs, result.toString());
437 } catch (InterruptedException e) {
438 getLog().warn("Interrupted!", e);
439 // Restore interrupted state...
440 Thread.currentThread().interrupt();
441 } catch (Exception x) {
442 getLog().warn(x.getMessage(), x);
447 public synchronized MRPublisherResponse sendBatchWithResponse() {
448 // it's possible for this call to be made with an empty list. in this
449 // case, just return.
450 if (fPending.isEmpty()) {
451 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
452 pubResponse.setResponseMessage("No Messages to send");
456 final long nowMs = Clock.now();
458 host = this.fHostSelector.selectBaseHost();
460 final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROTOCOL),
461 props.getProperty(PARTITION));
462 OutputStream os = null;
463 try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) {
465 final String propsContentType = props.getProperty(CONTENT_TYPE);
466 if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
467 JSONArray jsonArray = parseJSON();
468 os.write(jsonArray.toString().getBytes());
469 } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
470 for (TimestampedMessage m : fPending) {
471 os.write(m.fMsg.getBytes());
474 } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
475 || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
476 if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
477 os = new GZIPOutputStream(baseStream);
479 for (TimestampedMessage m : fPending) {
480 os.write(("" + m.fPartition.length()).getBytes());
482 os.write(("" + m.fMsg.length()).getBytes());
484 os.write(m.fPartition.getBytes());
485 os.write(m.fMsg.getBytes());
490 for (TimestampedMessage m : fPending) {
491 os.write(m.fMsg.getBytes());
496 final long startMs = Clock.now();
497 if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
504 if (fPending.peek() != null) {
505 logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
507 sender.setPayload(os.toString());
509 String dmeResponse = sender.sendAndWait(5000L);
511 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
513 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
514 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
518 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
519 getLog().info(logLine);
522 } catch (DME2Exception x) {
523 getLog().warn(x.getMessage(), x);
524 pubResponse.setResponseCode(x.getErrorCode());
525 pubResponse.setResponseMessage(x.getErrorMessage());
526 } catch (URISyntaxException x) {
528 getLog().warn(x.getMessage(), x);
529 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
530 pubResponse.setResponseMessage(x.getMessage());
531 } catch (InterruptedException e) {
533 } catch (Exception x) {
535 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
536 pubResponse.setResponseMessage(x.getMessage());
537 logger.error("exception: ", x);
544 if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
545 if (fPending.peek() != null) {
546 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
548 final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey,
549 authDate, username, password, protocolFlag);
550 // Here we are checking for error response. If HTTP status
551 // code is not within the http success response code
552 // then we consider this as error and return false
554 pubResponse = createMRPublisherResponse(result, pubResponse);
556 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
557 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
562 logTime(startMs, result);
567 if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
568 if (fPending.peek() != null) {
569 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
571 final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username,
572 password, protocolFlag);
574 // Here we are checking for error response. If HTTP status
575 // code is not within the http success response code
576 // then we consider this as error and return false
577 pubResponse = createMRPublisherResponse(result, pubResponse);
579 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
580 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
585 final String logLine = String.valueOf((Clock.now() - startMs));
586 getLog().info(logLine);
591 if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
592 if (fPending.peek() != null) {
593 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
595 final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType);
597 // Here we are checking for error response. If HTTP status
598 // code is not within the http success response code
599 // then we consider this as error and return false
600 pubResponse = createMRPublisherResponse(result, pubResponse);
602 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
603 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
608 final String logLine = String.valueOf((Clock.now() - startMs));
609 getLog().info(logLine);
613 } catch (IllegalArgumentException | HttpException x) {
614 getLog().warn(x.getMessage(), x);
615 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
616 pubResponse.setResponseMessage(x.getMessage());
618 } catch (IOException x) {
619 getLog().warn(x.getMessage(), x);
620 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
621 pubResponse.setResponseMessage(x.getMessage());
622 } catch (InterruptedException e) {
623 getLog().warn("Interrupted!", e);
624 // Restore interrupted state...
625 Thread.currentThread().interrupt();
626 } catch (Exception x) {
627 getLog().warn(x.getMessage(), x);
629 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
630 pubResponse.setResponseMessage(x.getMessage());
633 if (!fPending.isEmpty()) {
634 getLog().warn("Send failed, " + fPending.size() + " message to send.");
635 pubResponse.setPendingMsgs(fPending.size());
640 } catch (Exception x) {
641 getLog().warn(x.getMessage(), x);
642 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
643 pubResponse.setResponseMessage("Error in closing Output Stream");
651 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
653 if (reply.isEmpty()) {
655 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
656 mrPubResponse.setResponseMessage("Please verify the Producer properties");
657 } else if (reply.startsWith("{")) {
658 JSONObject jObject = new JSONObject(reply);
659 if (jObject.has("message") && jObject.has(JSON_STATUS)) {
660 String message = jObject.getString("message");
661 if (null != message) {
662 mrPubResponse.setResponseMessage(message);
664 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
666 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
667 mrPubResponse.setResponseMessage(reply);
669 } else if (reply.startsWith("<")) {
670 String responseCode = getHTTPErrorResponseCode(reply);
671 if (responseCode.contains("403")) {
672 responseCode = "403";
674 mrPubResponse.setResponseCode(responseCode);
675 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
678 return mrPubResponse;
681 private final String fTopic;
682 private final int fMaxBatchSize;
683 private final long fMaxBatchAgeMs;
684 private final boolean fCompress;
685 private int threadOccurrenceTime;
686 private boolean fClosed;
687 private String username;
688 private String password;
692 private HostSelector fHostSelector = null;
694 private final LinkedBlockingQueue<TimestampedMessage> fPending;
695 private long fDontSendUntilMs;
696 private final ScheduledThreadPoolExecutor fExec;
698 private String latitude;
699 private String longitude;
700 private String version;
701 private String serviceName;
703 private String partner;
704 private String routeOffer;
705 private String subContextPath;
706 private String protocol;
707 private String methodType;
709 private String dmeuser;
710 private String dmepassword;
711 private String contentType;
712 private static final long SF_WAIT_AFTER_ERROR = 10000;
713 private HashMap<String, String> DMETimeOuts;
714 private DME2Client sender;
715 public String protocolFlag = ProtocolType.DME2.getValue();
716 private String authKey;
717 private String authDate;
718 private String handlers;
719 private Properties props;
720 public static String routerFilePath;
721 protected static final Map<String, String> headers = new HashMap<String, String>();
722 public static MultivaluedMap<String, Object> headersMap;
724 private MRPublisherResponse pubResponse;
726 public MRPublisherResponse getPubResponse() {
730 public void setPubResponse(MRPublisherResponse pubResponse) {
731 this.pubResponse = pubResponse;
734 public static String getRouterFilePath() {
735 return routerFilePath;
738 public static void setRouterFilePath(String routerFilePath) {
739 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
742 public Properties getProps() {
746 public void setProps(Properties props) {
748 setClientConfig(DmaapClientUtil.getClientConfig(props));
751 public String getProtocolFlag() {
755 public void setProtocolFlag(String protocolFlag) {
756 this.protocolFlag = protocolFlag;
759 private void configureDME2() throws Exception {
762 latitude = props.getProperty(LATITUDE);
763 longitude = props.getProperty(LONGITUDE);
764 version = props.getProperty(VERSION);
765 serviceName = props.getProperty(SERVICE_NAME);
766 env = props.getProperty(ENVIRONMENT);
767 partner = props.getProperty(PARTNER);
768 routeOffer = props.getProperty(ROUTE_OFFER);
769 subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic;
771 protocol = props.getProperty(PROTOCOL);
772 methodType = props.getProperty(METHOD_TYPE);
773 dmeuser = props.getProperty(USERNAME);
774 dmepassword = props.getProperty(PASSWORD);
775 contentType = props.getProperty(CONTENT_TYPE);
776 handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
778 MRSimplerBatchPublisher.routerFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
781 * Changes to DME2Client url to use Partner for auto failover
782 * between data centers When Partner value is not provided use the
783 * routeOffer value for auto failover within a cluster
786 String partitionKey = props.getProperty(PARTITION);
788 if (partner != null && !partner.isEmpty()) {
789 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
791 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
792 url = url + "&partitionKey=" + partitionKey;
794 } else if (routeOffer != null && !routeOffer.isEmpty()) {
795 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
797 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
798 url = url + "&partitionKey=" + partitionKey;
802 DMETimeOuts = new HashMap<>();
803 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
804 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
805 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
806 DMETimeOuts.put("Content-Type", contentType);
807 System.setProperty("AFT_LATITUDE", latitude);
808 System.setProperty("AFT_LONGITUDE", longitude);
809 System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
810 // System.setProperty("DME2.DEBUG", "true");
813 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
815 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
816 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
817 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
821 sender = new DME2Client(new URI(url), 5000L);
823 sender.setAllowAllHttpReturnCodes(true);
824 sender.setMethod(methodType);
825 sender.setSubContext(subContextPath);
826 sender.setCredentials(dmeuser, dmepassword);
827 sender.setHeaders(DMETimeOuts);
828 if ("yes".equalsIgnoreCase(handlers)) {
829 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
830 props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
831 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
832 props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
833 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
835 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
837 } catch (DME2Exception x) {
838 getLog().warn(x.getMessage(), x);
839 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
840 } catch (URISyntaxException x) {
842 getLog().warn(x.getMessage(), x);
843 throw new URISyntaxException(url, x.getMessage());
844 } catch (Exception x) {
846 getLog().warn(x.getMessage(), x);
847 throw new IllegalArgumentException(x.getMessage());
851 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
852 boolean compress) throws MalformedURLException {
855 if (topic == null || topic.length() < 1) {
856 throw new IllegalArgumentException("A topic must be provided.");
859 fHostSelector = new HostSelector(hosts, null);
862 fMaxBatchSize = maxBatchSize;
863 fMaxBatchAgeMs = maxBatchAgeMs;
864 fCompress = compress;
866 fPending = new LinkedBlockingQueue<>();
867 fDontSendUntilMs = 0;
868 fExec = new ScheduledThreadPoolExecutor(1);
869 pubResponse = new MRPublisherResponse();
873 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
874 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException {
877 if (topic == null || topic.length() < 1) {
878 throw new IllegalArgumentException("A topic must be provided.");
881 fHostSelector = new HostSelector(hosts, null);
884 fMaxBatchSize = maxBatchSize;
885 fMaxBatchAgeMs = maxBatchAgeMs;
886 fCompress = compress;
887 threadOccurrenceTime = httpThreadOccurrence;
888 fPending = new LinkedBlockingQueue<>();
889 fDontSendUntilMs = 0;
890 fExec = new ScheduledThreadPoolExecutor(1);
891 fExec.scheduleAtFixedRate(new Runnable() {
896 }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS);
897 pubResponse = new MRPublisherResponse();
900 private static class TimestampedMessage extends Message {
901 public TimestampedMessage(Message message) {
903 timestamp = Clock.now();
906 public final long timestamp;
909 public String getUsername() {
913 public void setUsername(String username) {
914 this.username = username;
917 public String getPassword() {
921 public void setPassword(String password) {
922 this.password = password;
925 public String getHost() {
929 public void setHost(String host) {
933 public String getContentType() {
937 public void setContentType(String contentType) {
938 this.contentType = contentType;
941 public String getAuthKey() {
945 public void setAuthKey(String authKey) {
946 this.authKey = authKey;
949 public String getAuthDate() {
953 public void setAuthDate(String authDate) {
954 this.authDate = authDate;