1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2021 Orange.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
25 package org.onap.dmaap.mr.client.impl;
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
34 import java.net.URISyntaxException;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
40 import java.util.Properties;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.zip.GZIPOutputStream;
46 import javax.ws.rs.core.MultivaluedMap;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpStatus;
49 import org.json.JSONArray;
50 import org.json.JSONObject;
51 import org.json.JSONTokener;
52 import org.onap.dmaap.mr.client.DmaapClientConst;
53 import org.onap.dmaap.mr.client.HostSelector;
54 import org.onap.dmaap.mr.client.MRBatchingPublisher;
55 import org.onap.dmaap.mr.client.ProtocolType;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
61 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
63 private static final String HEADER_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
64 private static final String HEADER_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
65 private static final String HEADER_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
67 private static final String CONTENT_TYPE_TEXT = "text/plain";
69 private static final String JSON_STATUS = "status";
71 public static class Builder {
73 public Builder againstUrls(Collection<String> baseUrls) {
78 public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) {
83 public Builder onTopic(String topic) {
88 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
89 fMaxBatchSize = maxBatchSize;
90 fMaxBatchAgeMs = maxBatchAgeMs;
94 public Builder compress(boolean compress) {
99 public Builder httpThreadTime(int threadOccurrenceTime) {
100 this.threadOccurrenceTime = threadOccurrenceTime;
104 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
105 fAllowSelfSignedCerts = allowSelfSignedCerts;
109 public Builder withResponse(boolean withResponse) {
110 fWithResponse = withResponse;
114 public MRSimplerBatchPublisher build() {
115 if (!fWithResponse) {
117 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
118 fAllowSelfSignedCerts, threadOccurrenceTime);
119 } catch (MalformedURLException e) {
120 throw new IllegalArgumentException(e);
124 return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
125 fAllowSelfSignedCerts, fMaxBatchSize);
126 } catch (MalformedURLException e) {
127 throw new IllegalArgumentException(e);
133 private Collection<String> fUrls;
134 private String fTopic;
135 private int fMaxBatchSize = 100;
137 private long fMaxBatchAgeMs = 1000;
138 private boolean fCompress = false;
139 private int threadOccurrenceTime = 50;
140 private boolean fAllowSelfSignedCerts = false;
141 private boolean fWithResponse = false;
146 public int send(String partition, String msg) {
147 return send(new Message(partition, msg));
151 public int send(String msg) {
152 return send(new Message(null, msg));
156 public int send(Message msg) {
157 final LinkedList<Message> list = new LinkedList<>();
163 public synchronized int send(Collection<Message> msgs) {
165 throw new IllegalStateException("The publisher was closed.");
168 for (Message userMsg : msgs) {
169 fPending.add(new TimestampedMessage(userMsg));
171 return getPendingMessageCount();
175 public synchronized int getPendingMessageCount() {
176 return fPending.size();
180 public void close() {
182 final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
183 if (remains.isEmpty()) {
184 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
187 } catch (InterruptedException e) {
188 getLog().warn("Possible message loss. " + e.getMessage(), e);
189 Thread.currentThread().interrupt();
190 } catch (IOException e) {
191 getLog().warn("Possible message loss. " + e.getMessage(), e);
196 public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
197 synchronized (this) {
200 // stop the background sender
201 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
202 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
206 final long now = Clock.now();
207 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
208 final long timeoutAtMs = now + waitInMs;
210 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
215 synchronized (this) {
216 final LinkedList<Message> result = new LinkedList<>();
217 fPending.drainTo(result);
223 * Possibly send a batch to the MR server. This is called by the background
224 * thread and the close() method
228 private synchronized void send(boolean force) {
229 if ((force || shouldSendNow()) && !sendBatch()) {
230 getLog().warn("Send failed, {} message to send.", fPending.size());
231 // note the time for back-off
232 fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now();
236 private synchronized boolean shouldSendNow() {
237 boolean shouldSend = false;
238 if (!fPending.isEmpty()) {
239 final long nowMs = Clock.now();
241 shouldSend = (fPending.size() >= fMaxBatchSize);
243 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
244 shouldSend = sendAtMs <= nowMs;
247 // however, wait after an error
248 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
254 * Method to parse published JSON Objects and Arrays.
258 private JSONArray parseJSON() {
259 JSONArray jsonArray = new JSONArray();
260 for (TimestampedMessage m : fPending) {
261 JSONTokener jsonTokener = new JSONTokener(m.fMsg);
262 JSONObject jsonObject = null;
263 JSONArray tempjsonArray = null;
264 final char firstChar = jsonTokener.next();
266 if ('[' == firstChar) {
267 tempjsonArray = new JSONArray(jsonTokener);
268 for (int i = 0; i < tempjsonArray.length(); i++) {
269 jsonArray.put(tempjsonArray.getJSONObject(i));
272 jsonObject = new JSONObject(jsonTokener);
273 jsonArray.put(jsonObject);
280 private void logTime(long startMs, String dmeResponse) {
281 if (getLog().isInfoEnabled()) {
282 getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse);
286 private void logSendMessage(int nbMessage, String dest, long time) {
287 if (getLog().isInfoEnabled()) {
288 getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time);
292 private synchronized boolean sendBatch() {
293 // it's possible for this call to be made with an empty list. in this
294 // case, just return.
295 if (fPending.isEmpty()) {
299 final long nowMs = Clock.now();
301 if (this.fHostSelector != null) {
302 host = this.fHostSelector.selectBaseHost();
305 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(DmaapClientConst.PROTOCOL),
306 props.getProperty(DmaapClientConst.PARTITION));
310 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
311 OutputStream os = baseStream;
312 final String contentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
313 if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
314 JSONArray jsonArray = parseJSON();
315 os.write(jsonArray.toString().getBytes());
318 } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
319 for (TimestampedMessage m : fPending) {
320 os.write(m.fMsg.getBytes());
324 } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
325 || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
326 if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
327 os = new GZIPOutputStream(baseStream);
329 for (TimestampedMessage m : fPending) {
331 os.write(("" + m.fPartition.length()).getBytes());
333 os.write(("" + m.fMsg.length()).getBytes());
335 os.write(m.fPartition.getBytes());
336 os.write(m.fMsg.getBytes());
341 for (TimestampedMessage m : fPending) {
342 os.write(m.fMsg.getBytes());
348 final long startMs = Clock.now();
349 if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
354 if (fPending.peek() != null) {
355 logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
357 sender.setPayload(os.toString());
358 String dmeResponse = sender.sendAndWait(5000L);
360 logTime(startMs, dmeResponse);
365 if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
366 if (fPending.peek() != null) {
367 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
369 final JSONObject result =
370 postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
371 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
372 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
373 // Here we are checking for error response. If HTTP status
374 // code is not within the http success response code
375 // then we consider this as error and return false
376 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
379 logTime(startMs, result.toString());
384 if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
385 if (fPending.peek() != null) {
386 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
388 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
391 // Here we are checking for error response. If HTTP status
392 // code is not within the http success response code
393 // then we consider this as error and return false
394 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
397 logTime(startMs, result.toString());
402 if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
403 if (fPending.peek() != null) {
404 logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
406 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
408 // Here we are checking for error response. If HTTP status
409 // code is not within the http success response code
410 // then we consider this as error and return false
411 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
414 logTime(startMs, result.toString());
418 } catch (InterruptedException e) {
419 getLog().warn("Interrupted!", e);
420 // Restore interrupted state...
421 Thread.currentThread().interrupt();
422 } catch (Exception x) {
423 getLog().warn(x.getMessage(), x);
428 public synchronized MRPublisherResponse sendBatchWithResponse() {
429 // it's possible for this call to be made with an empty list. in this
430 // case, just return.
431 if (fPending.isEmpty()) {
432 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
433 pubResponse.setResponseMessage("No Messages to send");
437 final long nowMs = Clock.now();
439 host = this.fHostSelector.selectBaseHost();
441 final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(DmaapClientConst.PROTOCOL),
442 props.getProperty(DmaapClientConst.PARTITION));
443 OutputStream os = null;
444 try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) {
446 final String propsContentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
447 if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
448 JSONArray jsonArray = parseJSON();
449 os.write(jsonArray.toString().getBytes());
450 } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
451 for (TimestampedMessage m : fPending) {
452 os.write(m.fMsg.getBytes());
455 } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
456 || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
457 if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
458 os = new GZIPOutputStream(baseStream);
460 for (TimestampedMessage m : fPending) {
461 os.write(("" + m.fPartition.length()).getBytes());
463 os.write(("" + m.fMsg.length()).getBytes());
465 os.write(m.fPartition.getBytes());
466 os.write(m.fMsg.getBytes());
471 for (TimestampedMessage m : fPending) {
472 os.write(m.fMsg.getBytes());
477 final long startMs = Clock.now();
478 if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
485 if (fPending.peek() != null) {
486 logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
488 sender.setPayload(os.toString());
490 String dmeResponse = sender.sendAndWait(5000L);
492 pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
494 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
495 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
499 final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
500 getLog().info(logLine);
503 } catch (DME2Exception x) {
504 getLog().warn(x.getMessage(), x);
505 pubResponse.setResponseCode(x.getErrorCode());
506 pubResponse.setResponseMessage(x.getErrorMessage());
507 } catch (URISyntaxException x) {
509 getLog().warn(x.getMessage(), x);
510 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
511 pubResponse.setResponseMessage(x.getMessage());
512 } catch (InterruptedException e) {
514 } catch (Exception x) {
516 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
517 pubResponse.setResponseMessage(x.getMessage());
518 logger.error("exception: ", x);
525 if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
526 if (fPending.peek() != null) {
527 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
529 final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey,
530 authDate, username, password, protocolFlag);
531 // Here we are checking for error response. If HTTP status
532 // code is not within the http success response code
533 // then we consider this as error and return false
535 pubResponse = createMRPublisherResponse(result, pubResponse);
537 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
538 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
543 logTime(startMs, result);
548 if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
549 if (fPending.peek() != null) {
550 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
552 final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username,
553 password, protocolFlag);
555 // Here we are checking for error response. If HTTP status
556 // code is not within the http success response code
557 // then we consider this as error and return false
558 pubResponse = createMRPublisherResponse(result, pubResponse);
560 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
561 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
566 final String logLine = String.valueOf((Clock.now() - startMs));
567 getLog().info(logLine);
572 if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
573 if (fPending.peek() != null) {
574 logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
576 final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType);
578 // Here we are checking for error response. If HTTP status
579 // code is not within the http success response code
580 // then we consider this as error and return false
581 pubResponse = createMRPublisherResponse(result, pubResponse);
583 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
584 || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
589 final String logLine = String.valueOf((Clock.now() - startMs));
590 getLog().info(logLine);
594 } catch (IllegalArgumentException | HttpException x) {
595 getLog().warn(x.getMessage(), x);
596 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
597 pubResponse.setResponseMessage(x.getMessage());
599 } catch (IOException x) {
600 getLog().warn(x.getMessage(), x);
601 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
602 pubResponse.setResponseMessage(x.getMessage());
603 } catch (InterruptedException e) {
604 getLog().warn("Interrupted!", e);
605 // Restore interrupted state...
606 Thread.currentThread().interrupt();
607 } catch (Exception x) {
608 getLog().warn(x.getMessage(), x);
610 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
611 pubResponse.setResponseMessage(x.getMessage());
614 if (!fPending.isEmpty()) {
615 getLog().warn("Send failed, " + fPending.size() + " message to send.");
616 pubResponse.setPendingMsgs(fPending.size());
621 } catch (Exception x) {
622 getLog().warn(x.getMessage(), x);
623 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
624 pubResponse.setResponseMessage("Error in closing Output Stream");
632 public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
634 if (reply.isEmpty()) {
636 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
637 mrPubResponse.setResponseMessage("Please verify the Producer properties");
638 } else if (reply.startsWith("{")) {
639 JSONObject jObject = new JSONObject(reply);
640 if (jObject.has("message") && jObject.has(JSON_STATUS)) {
641 String message = jObject.getString("message");
642 if (null != message) {
643 mrPubResponse.setResponseMessage(message);
645 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
647 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
648 mrPubResponse.setResponseMessage(reply);
650 } else if (reply.startsWith("<")) {
651 String responseCode = getHTTPErrorResponseCode(reply);
652 if (responseCode.contains("403")) {
653 responseCode = "403";
655 mrPubResponse.setResponseCode(responseCode);
656 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
659 return mrPubResponse;
662 private final String fTopic;
663 private final int fMaxBatchSize;
664 private final long fMaxBatchAgeMs;
665 private final boolean fCompress;
666 private int threadOccurrenceTime;
667 private boolean fClosed;
668 private String username;
669 private String password;
673 private HostSelector fHostSelector = null;
675 private final LinkedBlockingQueue<TimestampedMessage> fPending;
676 private long fDontSendUntilMs;
677 private final ScheduledThreadPoolExecutor fExec;
679 private String latitude;
680 private String longitude;
681 private String version;
682 private String serviceName;
684 private String partner;
685 private String routeOffer;
686 private String subContextPath;
687 private String protocol;
688 private String methodType;
690 private String dmeuser;
691 private String dmepassword;
692 private String contentType;
693 private static final long SF_WAIT_AFTER_ERROR = 10000;
694 private HashMap<String, String> DMETimeOuts;
695 private DME2Client sender;
696 public String protocolFlag = ProtocolType.DME2.getValue();
697 private String authKey;
698 private String authDate;
699 private String handlers;
700 private Properties props;
701 public static String routerFilePath;
702 protected static final Map<String, String> headers = new HashMap<String, String>();
703 public static MultivaluedMap<String, Object> headersMap;
705 private MRPublisherResponse pubResponse;
707 public MRPublisherResponse getPubResponse() {
711 public void setPubResponse(MRPublisherResponse pubResponse) {
712 this.pubResponse = pubResponse;
715 public static String getRouterFilePath() {
716 return routerFilePath;
719 public static void setRouterFilePath(String routerFilePath) {
720 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
723 public Properties getProps() {
727 public void setProps(Properties props) {
729 setClientConfig(DmaapClientUtil.getClientConfig(props));
732 public String getProtocolFlag() {
736 public void setProtocolFlag(String protocolFlag) {
737 this.protocolFlag = protocolFlag;
740 private void configureDME2() throws Exception {
743 latitude = props.getProperty(DmaapClientConst.LATITUDE);
744 longitude = props.getProperty(DmaapClientConst.LONGITUDE);
745 version = props.getProperty(DmaapClientConst.VERSION);
746 serviceName = props.getProperty(DmaapClientConst.SERVICE_NAME);
747 env = props.getProperty(DmaapClientConst.ENVIRONMENT);
748 partner = props.getProperty(DmaapClientConst.PARTNER);
749 routeOffer = props.getProperty(DmaapClientConst.ROUTE_OFFER);
750 subContextPath = props.getProperty(DmaapClientConst.SUB_CONTEXT_PATH) + fTopic;
752 protocol = props.getProperty(DmaapClientConst.PROTOCOL);
753 methodType = props.getProperty(DmaapClientConst.METHOD_TYPE);
754 dmeuser = props.getProperty(DmaapClientConst.USERNAME);
755 dmepassword = props.getProperty(DmaapClientConst.PASSWORD);
756 contentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
757 handlers = props.getProperty(DmaapClientConst.SESSION_STICKINESS_REQUIRED);
759 MRSimplerBatchPublisher.routerFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
762 * Changes to DME2Client url to use Partner for auto failover
763 * between data centers When Partner value is not provided use the
764 * routeOffer value for auto failover within a cluster
767 String partitionKey = props.getProperty(DmaapClientConst.PARTITION);
769 if (partner != null && !partner.isEmpty()) {
770 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
772 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
773 url = url + "&partitionKey=" + partitionKey;
775 } else if (routeOffer != null && !routeOffer.isEmpty()) {
776 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
778 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
779 url = url + "&partitionKey=" + partitionKey;
783 DMETimeOuts = new HashMap<>();
784 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_EP_READ_TIMEOUT_MS));
785 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
786 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(DmaapClientConst.AFT_DME2_EP_CONN_TIMEOUT));
787 DMETimeOuts.put("Content-Type", contentType);
788 System.setProperty("AFT_LATITUDE", latitude);
789 System.setProperty("AFT_LONGITUDE", longitude);
790 System.setProperty("AFT_ENVIRONMENT", props.getProperty(DmaapClientConst.AFT_ENVIRONMENT));
791 // System.setProperty("DME2.DEBUG", "true");
794 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
796 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
797 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
798 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
802 sender = new DME2Client(new URI(url), 5000L);
804 sender.setAllowAllHttpReturnCodes(true);
805 sender.setMethod(methodType);
806 sender.setSubContext(subContextPath);
807 sender.setCredentials(dmeuser, dmepassword);
808 sender.setHeaders(DMETimeOuts);
809 if ("yes".equalsIgnoreCase(handlers)) {
810 sender.addHeader(HEADER_DME2_EXCHANGE_REQUEST_HANDLERS,
811 props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
812 sender.addHeader(HEADER_DME2_EXCHANGE_REPLY_HANDLERS,
813 props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REPLY_HANDLERS));
814 sender.addHeader(HEADER_DME2_REQ_TRACE_ON, props.getProperty(DmaapClientConst.AFT_DME2_REQ_TRACE_ON));
816 sender.addHeader(HEADER_DME2_EXCHANGE_REQUEST_HANDLERS, "com.att.nsa.mr.dme.client.HeaderReplyHandler");
818 } catch (DME2Exception x) {
819 getLog().warn(x.getMessage(), x);
820 throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
821 } catch (URISyntaxException x) {
823 getLog().warn(x.getMessage(), x);
824 throw new URISyntaxException(url, x.getMessage());
825 } catch (Exception x) {
827 getLog().warn(x.getMessage(), x);
828 throw new IllegalArgumentException(x.getMessage());
832 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
833 boolean compress) throws MalformedURLException {
836 if (topic == null || topic.length() < 1) {
837 throw new IllegalArgumentException("A topic must be provided.");
840 fHostSelector = new HostSelector(hosts, null);
843 fMaxBatchSize = maxBatchSize;
844 fMaxBatchAgeMs = maxBatchAgeMs;
845 fCompress = compress;
847 fPending = new LinkedBlockingQueue<>();
848 fDontSendUntilMs = 0;
849 fExec = new ScheduledThreadPoolExecutor(1);
850 pubResponse = new MRPublisherResponse();
854 private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
855 boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException {
858 if (topic == null || topic.length() < 1) {
859 throw new IllegalArgumentException("A topic must be provided.");
862 fHostSelector = new HostSelector(hosts, null);
865 fMaxBatchSize = maxBatchSize;
866 fMaxBatchAgeMs = maxBatchAgeMs;
867 fCompress = compress;
868 threadOccurrenceTime = httpThreadOccurrence;
869 fPending = new LinkedBlockingQueue<>();
870 fDontSendUntilMs = 0;
871 fExec = new ScheduledThreadPoolExecutor(1);
872 fExec.scheduleAtFixedRate(new Runnable() {
877 }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS);
878 pubResponse = new MRPublisherResponse();
881 private static class TimestampedMessage extends Message {
882 public TimestampedMessage(Message message) {
884 timestamp = Clock.now();
887 public final long timestamp;
890 public String getUsername() {
894 public void setUsername(String username) {
895 this.username = username;
898 public String getPassword() {
902 public void setPassword(String password) {
903 this.password = password;
906 public String getHost() {
910 public void setHost(String host) {
914 public String getContentType() {
918 public void setContentType(String contentType) {
919 this.contentType = contentType;
922 public String getAuthKey() {
926 public void setAuthKey(String authKey) {
927 this.authKey = authKey;
930 public String getAuthDate() {
934 public void setAuthDate(String authDate) {
935 this.authDate = authDate;