1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.apache.http.HttpException;
44 import org.apache.http.HttpStatus;
45 import org.json.JSONArray;
46 import org.json.JSONObject;
48 import com.att.aft.dme2.api.DME2Client;
49 import com.att.aft.dme2.api.DME2Exception;
50 import com.att.nsa.mr.client.HostSelector;
51 import com.att.nsa.mr.client.MRBatchingPublisher;
52 import com.att.nsa.mr.client.response.MRPublisherResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
55 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
57 public static class Builder
63 public Builder againstUrls ( Collection<String> baseUrls )
69 public Builder onTopic ( String topic )
75 public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
77 fMaxBatchSize = maxBatchSize;
78 fMaxBatchAgeMs = maxBatchAgeMs;
82 public Builder compress ( boolean compress )
88 public Builder httpThreadTime ( int threadOccuranceTime )
90 this.threadOccuranceTime = threadOccuranceTime;
94 public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
96 fAllowSelfSignedCerts = allowSelfSignedCerts;
100 public Builder withResponse ( boolean withResponse)
102 fWithResponse = withResponse;
105 public MRSimplerBatchPublisher build ()
110 return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
111 } catch (MalformedURLException e) {
112 throw new RuntimeException(e);
117 return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
118 } catch (MalformedURLException e) {
119 throw new RuntimeException(e);
125 private Collection<String> fUrls;
126 private String fTopic;
127 private int fMaxBatchSize = 100;
128 private long fMaxBatchAgeMs = 1000;
129 private boolean fCompress = false;
130 private int threadOccuranceTime = 50;
131 private boolean fAllowSelfSignedCerts = false;
132 private boolean fWithResponse = false;
137 public int send ( String partition, String msg )
139 return send ( new message ( partition, msg ) );
142 public int send ( String msg )
144 return send ( new message ( null, msg ) );
149 public int send ( message msg )
151 final LinkedList<message> list = new LinkedList<message> ();
153 return send ( list );
159 public synchronized int send ( Collection<message> msgs )
163 throw new IllegalStateException ( "The publisher was closed." );
166 for ( message userMsg : msgs )
168 fPending.add ( new TimestampedMessage ( userMsg ) );
170 return getPendingMessageCount ();
174 public synchronized int getPendingMessageCount ()
176 return fPending.size ();
184 final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
185 if ( remains.size() > 0 )
187 getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
188 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
191 catch ( InterruptedException e )
193 getLog().warn ( "Possible message loss. " + e.getMessage(), e );
195 catch ( IOException e )
197 getLog().warn ( "Possible message loss. " + e.getMessage(), e );
202 public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
204 synchronized ( this )
208 // stop the background sender
209 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
210 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
214 final long now = Clock.now ();
215 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
216 final long timeoutAtMs = now + waitInMs;
218 while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
221 Thread.sleep ( 250 );
224 synchronized ( this )
226 final LinkedList<message> result = new LinkedList<message> ();
227 fPending.drainTo ( result );
233 * Possibly send a batch to the MR server. This is called by the background thread
234 * and the close() method
238 private synchronized void send ( boolean force )
240 if ( force || shouldSendNow () )
244 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
246 // note the time for back-off
247 fDontSendUntilMs = sfWaitAfterError + Clock.now ();
252 private synchronized boolean shouldSendNow ()
254 boolean shouldSend = false;
255 if ( fPending.size () > 0 )
257 final long nowMs = Clock.now ();
259 shouldSend = ( fPending.size() >= fMaxBatchSize );
262 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
263 shouldSend = sendAtMs <= nowMs;
266 // however, wait after an error
267 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
272 private synchronized boolean sendBatch ()
274 // it's possible for this call to be made with an empty list. in this case, just return.
275 if ( fPending.size() < 1 )
280 final long nowMs = Clock.now ();
282 host = this.fHostSelector.selectBaseHost();
284 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
289 /*final String contentType =
291 MRFormat.CAMBRIA_ZIP.toString () :
292 MRFormat.CAMBRIA.toString ()
295 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
296 OutputStream os = baseStream;
297 final String contentType = props.getProperty("contenttype");
298 if(contentType.equalsIgnoreCase("application/json")){
299 JSONArray jsonArray = new JSONArray();
300 for ( TimestampedMessage m : fPending )
302 JSONObject jsonObject = new JSONObject(m.fMsg);
304 jsonArray.put(jsonObject);
307 os.write (jsonArray.toString().getBytes() );
310 }else if (contentType.equalsIgnoreCase("text/plain")){
311 for ( TimestampedMessage m : fPending )
313 os.write ( m.fMsg.getBytes() );
317 } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
318 if ( contentType.equalsIgnoreCase("application/cambria-zip") )
320 os = new GZIPOutputStream ( baseStream );
322 for ( TimestampedMessage m : fPending )
325 os.write ( ( "" + m.fPartition.length () ).getBytes() );
327 os.write ( ( "" + m.fMsg.length () ).getBytes() );
329 os.write ( m.fPartition.getBytes() );
330 os.write ( m.fMsg.getBytes() );
335 for ( TimestampedMessage m : fPending )
337 os.write ( m.fMsg.getBytes() );
345 final long startMs = Clock.now ();
346 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
352 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
353 sender.setPayload(os.toString());
354 String dmeResponse = sender.sendAndWait(5000L);
356 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
357 + dmeResponse.toString();
358 getLog().info(logLine);
363 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
364 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
365 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
366 //System.out.println(result.getInt("status"));
367 //Here we are checking for error response. If HTTP status
368 //code is not within the http success response code
369 //then we consider this as error and return false
370 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
373 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
374 getLog().info(logLine);
379 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
380 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
381 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
384 //System.out.println(result.getInt("status"));
385 //Here we are checking for error response. If HTTP status
386 //code is not within the http success response code
387 //then we consider this as error and return false
388 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
391 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
392 getLog().info(logLine);
397 catch ( IllegalArgumentException x ) {
398 getLog().warn ( x.getMessage(), x );
399 } catch ( IOException x ) {
400 getLog().warn ( x.getMessage(), x );
401 } catch (HttpException x) {
402 getLog().warn ( x.getMessage(), x );
403 } catch (Exception x) {
404 getLog().warn(x.getMessage(), x);
409 public synchronized MRPublisherResponse sendBatchWithResponse ()
411 // it's possible for this call to be made with an empty list. in this case, just return.
412 if ( fPending.size() < 1 )
414 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
415 pubResponse.setResponseMessage("No Messages to send");
419 final long nowMs = Clock.now ();
421 host = this.fHostSelector.selectBaseHost();
423 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
424 OutputStream os=null;
428 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
430 final String contentType = props.getProperty("contenttype");
431 if(contentType.equalsIgnoreCase("application/json")){
432 JSONArray jsonArray = new JSONArray();
433 for ( TimestampedMessage m : fPending )
435 JSONObject jsonObject = new JSONObject(m.fMsg);
437 jsonArray.put(jsonObject);
440 os.write (jsonArray.toString().getBytes() );
441 }else if (contentType.equalsIgnoreCase("text/plain")){
442 for ( TimestampedMessage m : fPending )
444 os.write ( m.fMsg.getBytes() );
447 } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
448 if ( contentType.equalsIgnoreCase("application/cambria-zip") )
450 os = new GZIPOutputStream ( baseStream );
452 for ( TimestampedMessage m : fPending )
455 os.write ( ( "" + m.fPartition.length () ).getBytes() );
457 os.write ( ( "" + m.fMsg.length () ).getBytes() );
459 os.write ( m.fPartition.getBytes() );
460 os.write ( m.fMsg.getBytes() );
465 for ( TimestampedMessage m : fPending )
467 os.write ( m.fMsg.getBytes() );
474 final long startMs = Clock.now ();
475 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
482 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
483 sender.setPayload(os.toString());
486 String dmeResponse = sender.sendAndWait(5000L);
487 System.out.println("dmeres->"+dmeResponse);
490 pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
492 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
496 final String logLine = String.valueOf((Clock.now() - startMs))
497 + dmeResponse.toString();
498 getLog().info(logLine);
502 catch (DME2Exception x) {
503 getLog().warn(x.getMessage(), x);
504 pubResponse.setResponseCode(x.getErrorCode());
505 pubResponse.setResponseMessage(x.getErrorMessage());
506 } catch (URISyntaxException x) {
508 getLog().warn(x.getMessage(), x);
509 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
510 pubResponse.setResponseMessage(x.getMessage());
511 } catch (Exception x) {
513 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
514 pubResponse.setResponseMessage(x.getMessage());
521 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
522 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
523 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
524 //System.out.println(result.getInt("status"));
525 //Here we are checking for error response. If HTTP status
526 //code is not within the http success response code
527 //then we consider this as error and return false
530 pubResponse = createMRPublisherResponse(result,pubResponse);
532 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
537 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
538 getLog().info(logLine);
543 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
544 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
545 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
547 //System.out.println(result.getInt("status"));
548 //Here we are checking for error response. If HTTP status
549 //code is not within the http success response code
550 //then we consider this as error and return false
551 pubResponse = createMRPublisherResponse(result,pubResponse);
553 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
558 final String logLine = String.valueOf((Clock.now() - startMs));
559 getLog().info(logLine);
564 catch ( IllegalArgumentException x ) {
565 getLog().warn ( x.getMessage(), x );
566 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
567 pubResponse.setResponseMessage(x.getMessage());
569 } catch ( IOException x ) {
570 getLog().warn ( x.getMessage(), x );
571 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
572 pubResponse.setResponseMessage(x.getMessage());
574 } catch (HttpException x) {
575 getLog().warn ( x.getMessage(), x );
576 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
577 pubResponse.setResponseMessage(x.getMessage());
579 } catch (Exception x) {
580 getLog().warn(x.getMessage(), x);
582 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
583 pubResponse.setResponseMessage(x.getMessage());
588 if (fPending.size()>0) {
589 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
590 pubResponse.setPendingMsgs(fPending.size());
595 } catch (Exception x) {
596 getLog().warn(x.getMessage(), x);
597 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
598 pubResponse.setResponseMessage("Error in closing Output Stream");
606 private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
611 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
612 mrPubResponse.setResponseMessage("Please verify the Producer properties");
614 else if(reply.startsWith("{"))
616 JSONObject jObject = new JSONObject(reply);
617 if(jObject.has("message") && jObject.has("status"))
619 String message = jObject.getString("message");
622 mrPubResponse.setResponseMessage(message);
624 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
628 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
629 mrPubResponse.setResponseMessage(reply);
632 else if (reply.startsWith("<"))
634 String responseCode = getHTTPErrorResponseCode(reply);
635 if( responseCode.contains("403"))
637 responseCode = "403";
639 mrPubResponse.setResponseCode(responseCode);
640 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
643 return mrPubResponse;
646 private final String fTopic;
647 private final int fMaxBatchSize;
648 private final long fMaxBatchAgeMs;
649 private final boolean fCompress;
650 private int threadOccuranceTime;
651 private boolean fClosed;
652 private String username;
653 private String password;
657 private HostSelector fHostSelector = null;
659 private final LinkedBlockingQueue<TimestampedMessage> fPending;
660 private long fDontSendUntilMs;
661 private final ScheduledThreadPoolExecutor fExec;
663 private String latitude;
664 private String longitude;
665 private String version;
666 private String serviceName;
668 private String partner;
669 private String routeOffer;
670 private String subContextPath;
671 private String protocol;
672 private String methodType;
674 private String dmeuser;
675 private String dmepassword;
676 private String contentType;
677 private static final long sfWaitAfterError = 10000;
678 private HashMap<String, String> DMETimeOuts;
679 private DME2Client sender;
680 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
681 public String producerFilePath;
682 private String authKey;
683 private String authDate;
684 private String handlers;
685 private Properties props;
686 public static String routerFilePath;
687 public static Map<String, String> headers=new HashMap<String, String>();
688 public static MultivaluedMap<String, Object> headersMap;
691 private MRPublisherResponse pubResponse;
693 public MRPublisherResponse getPubResponse() {
696 public void setPubResponse(MRPublisherResponse pubResponse) {
697 this.pubResponse = pubResponse;
700 public static String getRouterFilePath() {
701 return routerFilePath;
704 public static void setRouterFilePath(String routerFilePath) {
705 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
708 public Properties getProps() {
712 public void setProps(Properties props) {
716 public String getProducerFilePath() {
717 return producerFilePath;
720 public void setProducerFilePath(String producerFilePath) {
721 this.producerFilePath = producerFilePath;
724 public String getProtocolFlag() {
728 public void setProtocolFlag(String protocolFlag) {
729 this.protocolFlag = protocolFlag;
733 private void DME2Configue() throws Exception {
736 /* FileReader reader = new FileReader(new File (producerFilePath));
737 Properties props = new Properties();
738 props.load(reader);*/
739 latitude = props.getProperty("Latitude");
740 longitude = props.getProperty("Longitude");
741 version = props.getProperty("Version");
742 serviceName = props.getProperty("ServiceName");
743 env = props.getProperty("Environment");
744 partner = props.getProperty("Partner");
745 routeOffer = props.getProperty("routeOffer");
746 subContextPath = props.getProperty("SubContextPath")+fTopic;
747 /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
748 subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
750 protocol = props.getProperty("Protocol");
751 methodType = props.getProperty("MethodType");
752 dmeuser = props.getProperty("username");
753 dmepassword = props.getProperty("password");
754 contentType = props.getProperty("contenttype");
755 handlers = props.getProperty("sessionstickinessrequired");
756 routerFilePath= props.getProperty("DME2preferredRouterFilePath");
759 * Changes to DME2Client url to use Partner for auto failover between data centers
760 * When Partner value is not provided use the routeOffer value for auto failover within a cluster
764 String partitionKey = props.getProperty("partition");
766 if (partner != null && !partner.isEmpty() )
768 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
769 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
770 url = url + "&partitionKey=" + partitionKey;
773 else if (routeOffer!=null && !routeOffer.isEmpty())
775 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
776 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
777 url = url + "&partitionKey=" + partitionKey;
781 DMETimeOuts = new HashMap<String, String>();
782 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
783 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
784 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
785 DMETimeOuts.put("Content-Type", contentType);
786 System.setProperty("AFT_LATITUDE", latitude);
787 System.setProperty("AFT_LONGITUDE", longitude);
788 System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
789 //System.setProperty("DME2.DEBUG", "true");
790 // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
791 //System.out.println("XXXXXX"+url);
794 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
795 "SSLv3,TLSv1,TLSv1.1");
796 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
797 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
801 sender = new DME2Client(new URI(url), 5000L);
803 sender.setAllowAllHttpReturnCodes(true);
804 sender.setMethod(methodType);
805 sender.setSubContext(subContextPath);
806 sender.setCredentials(dmeuser, dmepassword);
807 sender.setHeaders(DMETimeOuts);
808 if(handlers.equalsIgnoreCase("yes")){
809 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
810 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
811 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
813 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
815 } catch (DME2Exception x) {
816 getLog().warn(x.getMessage(), x);
817 throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
818 } catch (URISyntaxException x) {
820 getLog().warn(x.getMessage(), x);
821 throw new URISyntaxException(url,x.getMessage());
822 } catch (Exception x) {
824 getLog().warn(x.getMessage(), x);
825 throw new Exception(x.getMessage());
829 private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
833 if ( topic == null || topic.length() < 1 )
835 throw new IllegalArgumentException ( "A topic must be provided." );
838 fHostSelector = new HostSelector(hosts, null);
841 fMaxBatchSize = maxBatchSize;
842 fMaxBatchAgeMs = maxBatchAgeMs;
843 fCompress = compress;
845 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
846 fDontSendUntilMs = 0;
847 fExec = new ScheduledThreadPoolExecutor ( 1 );
848 pubResponse = new MRPublisherResponse();
852 private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
856 if ( topic == null || topic.length() < 1 )
858 throw new IllegalArgumentException ( "A topic must be provided." );
861 fHostSelector = new HostSelector(hosts, null);
864 fMaxBatchSize = maxBatchSize;
865 fMaxBatchAgeMs = maxBatchAgeMs;
866 fCompress = compress;
867 threadOccuranceTime=httpThreadOccurnace;
868 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
869 fDontSendUntilMs = 0;
870 fExec = new ScheduledThreadPoolExecutor ( 1 );
871 fExec.scheduleAtFixedRate ( new Runnable()
878 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
881 private static class TimestampedMessage extends message
883 public TimestampedMessage ( message m )
886 timestamp = Clock.now();
888 public final long timestamp;
891 public String getUsername() {
895 public void setUsername(String username) {
896 this.username = username;
899 public String getPassword() {
903 public void setPassword(String password) {
904 this.password = password;
907 public String getHost() {
911 public void setHost(String host) {
915 public String getContentType() {
919 public void setContentType(String contentType) {
920 this.contentType = contentType;
923 public String getAuthKey() {
927 public void setAuthKey(String authKey) {
928 this.authKey = authKey;
931 public String getAuthDate() {
935 public void setAuthDate(String authDate) {
936 this.authDate = authDate;