1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
41 import javax.ws.rs.core.MultivaluedMap;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
51 import com.att.aft.dme2.api.DME2Client;
52 import com.att.aft.dme2.api.DME2Exception;
53 import com.att.nsa.mr.client.HostSelector;
54 import com.att.nsa.mr.client.MRBatchingPublisher;
55 import com.att.nsa.mr.client.response.MRPublisherResponse;
56 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
58 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
60 private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62 public static class Builder
68 public Builder againstUrls ( Collection<String> baseUrls )
74 public Builder onTopic ( String topic )
80 public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
82 fMaxBatchSize = maxBatchSize;
83 fMaxBatchAgeMs = maxBatchAgeMs;
87 public Builder compress ( boolean compress )
93 public Builder httpThreadTime ( int threadOccuranceTime )
95 this.threadOccuranceTime = threadOccuranceTime;
99 public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
101 fAllowSelfSignedCerts = allowSelfSignedCerts;
105 public Builder withResponse ( boolean withResponse)
107 fWithResponse = withResponse;
110 public MRSimplerBatchPublisher build ()
115 return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
116 } catch (MalformedURLException e) {
117 throw new RuntimeException(e);
122 return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
123 } catch (MalformedURLException e) {
124 throw new RuntimeException(e);
130 private Collection<String> fUrls;
131 private String fTopic;
132 private int fMaxBatchSize = 100;
133 private long fMaxBatchAgeMs = 1000;
134 private boolean fCompress = false;
135 private int threadOccuranceTime = 50;
136 private boolean fAllowSelfSignedCerts = false;
137 private boolean fWithResponse = false;
142 public int send ( String partition, String msg )
144 return send ( new message ( partition, msg ) );
147 public int send ( String msg )
149 return send ( new message ( null, msg ) );
154 public int send ( message msg )
156 final LinkedList<message> list = new LinkedList<message> ();
158 return send ( list );
164 public synchronized int send ( Collection<message> msgs )
168 throw new IllegalStateException ( "The publisher was closed." );
171 for ( message userMsg : msgs )
173 fPending.add ( new TimestampedMessage ( userMsg ) );
175 return getPendingMessageCount ();
179 public synchronized int getPendingMessageCount ()
181 return fPending.size ();
189 final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
190 if ( remains.size() > 0 )
192 getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
193 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
196 catch ( InterruptedException e )
198 getLog().warn ( "Possible message loss. " + e.getMessage(), e );
200 catch ( IOException e )
202 getLog().warn ( "Possible message loss. " + e.getMessage(), e );
207 public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
209 synchronized ( this )
213 // stop the background sender
214 fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
215 fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
219 final long now = Clock.now ();
220 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
221 final long timeoutAtMs = now + waitInMs;
223 while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
226 Thread.sleep ( 250 );
229 synchronized ( this )
231 final LinkedList<message> result = new LinkedList<message> ();
232 fPending.drainTo ( result );
238 * Possibly send a batch to the MR server. This is called by the background thread
239 * and the close() method
243 private synchronized void send ( boolean force )
245 if ( force || shouldSendNow () )
249 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
251 // note the time for back-off
252 fDontSendUntilMs = sfWaitAfterError + Clock.now ();
257 private synchronized boolean shouldSendNow ()
259 boolean shouldSend = false;
260 if ( fPending.size () > 0 )
262 final long nowMs = Clock.now ();
264 shouldSend = ( fPending.size() >= fMaxBatchSize );
267 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
268 shouldSend = sendAtMs <= nowMs;
271 // however, wait after an error
272 shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
277 private synchronized boolean sendBatch ()
279 // it's possible for this call to be made with an empty list. in this case, just return.
280 if ( fPending.size() < 1 )
285 final long nowMs = Clock.now ();
287 host = this.fHostSelector.selectBaseHost();
289 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
294 /*final String contentType =
296 MRFormat.CAMBRIA_ZIP.toString () :
297 MRFormat.CAMBRIA.toString ()
300 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
301 OutputStream os = baseStream;
302 final String contentType = props.getProperty("contenttype");
303 if(contentType.equalsIgnoreCase("application/json")){
304 JSONArray jsonArray = new JSONArray();
305 for ( TimestampedMessage m : fPending )
307 JSONObject jsonObject = new JSONObject(m.fMsg);
309 jsonArray.put(jsonObject);
312 os.write (jsonArray.toString().getBytes() );
315 }else if (contentType.equalsIgnoreCase("text/plain")){
316 for ( TimestampedMessage m : fPending )
318 os.write ( m.fMsg.getBytes() );
322 } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
323 if ( contentType.equalsIgnoreCase("application/cambria-zip") )
325 os = new GZIPOutputStream ( baseStream );
327 for ( TimestampedMessage m : fPending )
330 os.write ( ( "" + m.fPartition.length () ).getBytes() );
332 os.write ( ( "" + m.fMsg.length () ).getBytes() );
334 os.write ( m.fPartition.getBytes() );
335 os.write ( m.fMsg.getBytes() );
340 for ( TimestampedMessage m : fPending )
342 os.write ( m.fMsg.getBytes() );
350 final long startMs = Clock.now ();
351 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
357 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
358 sender.setPayload(os.toString());
359 String dmeResponse = sender.sendAndWait(5000L);
361 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
362 + dmeResponse.toString();
363 getLog().info(logLine);
368 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
369 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
370 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
371 //System.out.println(result.getInt("status"));
372 //Here we are checking for error response. If HTTP status
373 //code is not within the http success response code
374 //then we consider this as error and return false
375 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
378 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
379 getLog().info(logLine);
384 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
385 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
386 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
389 //System.out.println(result.getInt("status"));
390 //Here we are checking for error response. If HTTP status
391 //code is not within the http success response code
392 //then we consider this as error and return false
393 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
396 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
397 getLog().info(logLine);
402 catch ( IllegalArgumentException x ) {
403 getLog().warn ( x.getMessage(), x );
404 } catch ( IOException x ) {
405 getLog().warn ( x.getMessage(), x );
406 } catch (HttpException x) {
407 getLog().warn ( x.getMessage(), x );
408 } catch (Exception x) {
409 getLog().warn(x.getMessage(), x);
414 public synchronized MRPublisherResponse sendBatchWithResponse ()
416 // it's possible for this call to be made with an empty list. in this case, just return.
417 if ( fPending.size() < 1 )
419 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
420 pubResponse.setResponseMessage("No Messages to send");
424 final long nowMs = Clock.now ();
426 host = this.fHostSelector.selectBaseHost();
428 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
429 OutputStream os=null;
433 final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
435 final String contentType = props.getProperty("contenttype");
436 if(contentType.equalsIgnoreCase("application/json")){
437 JSONArray jsonArray = new JSONArray();
438 for ( TimestampedMessage m : fPending )
440 JSONObject jsonObject = new JSONObject(m.fMsg);
442 jsonArray.put(jsonObject);
445 os.write (jsonArray.toString().getBytes() );
446 }else if (contentType.equalsIgnoreCase("text/plain")){
447 for ( TimestampedMessage m : fPending )
449 os.write ( m.fMsg.getBytes() );
452 } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
453 if ( contentType.equalsIgnoreCase("application/cambria-zip") )
455 os = new GZIPOutputStream ( baseStream );
457 for ( TimestampedMessage m : fPending )
460 os.write ( ( "" + m.fPartition.length () ).getBytes() );
462 os.write ( ( "" + m.fMsg.length () ).getBytes() );
464 os.write ( m.fPartition.getBytes() );
465 os.write ( m.fMsg.getBytes() );
470 for ( TimestampedMessage m : fPending )
472 os.write ( m.fMsg.getBytes() );
479 final long startMs = Clock.now ();
480 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
487 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
488 sender.setPayload(os.toString());
491 String dmeResponse = sender.sendAndWait(5000L);
492 System.out.println("dmeres->"+dmeResponse);
495 pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
497 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
501 final String logLine = String.valueOf((Clock.now() - startMs))
502 + dmeResponse.toString();
503 getLog().info(logLine);
507 catch (DME2Exception x) {
508 getLog().warn(x.getMessage(), x);
509 pubResponse.setResponseCode(x.getErrorCode());
510 pubResponse.setResponseMessage(x.getErrorMessage());
511 } catch (URISyntaxException x) {
513 getLog().warn(x.getMessage(), x);
514 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
515 pubResponse.setResponseMessage(x.getMessage());
516 } catch (Exception x) {
518 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
519 pubResponse.setResponseMessage(x.getMessage());
520 logger.error("exception: ", x);
527 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
528 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
529 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
530 //System.out.println(result.getInt("status"));
531 //Here we are checking for error response. If HTTP status
532 //code is not within the http success response code
533 //then we consider this as error and return false
536 pubResponse = createMRPublisherResponse(result,pubResponse);
538 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
543 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
544 getLog().info(logLine);
549 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
550 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
551 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
553 //System.out.println(result.getInt("status"));
554 //Here we are checking for error response. If HTTP status
555 //code is not within the http success response code
556 //then we consider this as error and return false
557 pubResponse = createMRPublisherResponse(result,pubResponse);
559 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
564 final String logLine = String.valueOf((Clock.now() - startMs));
565 getLog().info(logLine);
570 catch ( IllegalArgumentException x ) {
571 getLog().warn ( x.getMessage(), x );
572 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
573 pubResponse.setResponseMessage(x.getMessage());
575 } catch ( IOException x ) {
576 getLog().warn ( x.getMessage(), x );
577 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
578 pubResponse.setResponseMessage(x.getMessage());
580 } catch (HttpException x) {
581 getLog().warn ( x.getMessage(), x );
582 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
583 pubResponse.setResponseMessage(x.getMessage());
585 } catch (Exception x) {
586 getLog().warn(x.getMessage(), x);
588 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
589 pubResponse.setResponseMessage(x.getMessage());
594 if (fPending.size()>0) {
595 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
596 pubResponse.setPendingMsgs(fPending.size());
601 } catch (Exception x) {
602 getLog().warn(x.getMessage(), x);
603 pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
604 pubResponse.setResponseMessage("Error in closing Output Stream");
612 private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
617 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
618 mrPubResponse.setResponseMessage("Please verify the Producer properties");
620 else if(reply.startsWith("{"))
622 JSONObject jObject = new JSONObject(reply);
623 if(jObject.has("message") && jObject.has("status"))
625 String message = jObject.getString("message");
628 mrPubResponse.setResponseMessage(message);
630 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
634 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
635 mrPubResponse.setResponseMessage(reply);
638 else if (reply.startsWith("<"))
640 String responseCode = getHTTPErrorResponseCode(reply);
641 if( responseCode.contains("403"))
643 responseCode = "403";
645 mrPubResponse.setResponseCode(responseCode);
646 mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
649 return mrPubResponse;
652 private final String fTopic;
653 private final int fMaxBatchSize;
654 private final long fMaxBatchAgeMs;
655 private final boolean fCompress;
656 private int threadOccuranceTime;
657 private boolean fClosed;
658 private String username;
659 private String password;
663 private HostSelector fHostSelector = null;
665 private final LinkedBlockingQueue<TimestampedMessage> fPending;
666 private long fDontSendUntilMs;
667 private final ScheduledThreadPoolExecutor fExec;
669 private String latitude;
670 private String longitude;
671 private String version;
672 private String serviceName;
674 private String partner;
675 private String routeOffer;
676 private String subContextPath;
677 private String protocol;
678 private String methodType;
680 private String dmeuser;
681 private String dmepassword;
682 private String contentType;
683 private static final long sfWaitAfterError = 10000;
684 private HashMap<String, String> DMETimeOuts;
685 private DME2Client sender;
686 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
687 public String producerFilePath;
688 private String authKey;
689 private String authDate;
690 private String handlers;
691 private Properties props;
692 public static String routerFilePath;
693 protected static final Map<String, String> headers=new HashMap<String, String>();
694 public static MultivaluedMap<String, Object> headersMap;
697 private MRPublisherResponse pubResponse;
699 public MRPublisherResponse getPubResponse() {
702 public void setPubResponse(MRPublisherResponse pubResponse) {
703 this.pubResponse = pubResponse;
706 public static String getRouterFilePath() {
707 return routerFilePath;
710 public static void setRouterFilePath(String routerFilePath) {
711 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
714 public Properties getProps() {
718 public void setProps(Properties props) {
722 public String getProducerFilePath() {
723 return producerFilePath;
726 public void setProducerFilePath(String producerFilePath) {
727 this.producerFilePath = producerFilePath;
730 public String getProtocolFlag() {
734 public void setProtocolFlag(String protocolFlag) {
735 this.protocolFlag = protocolFlag;
739 private void DME2Configue() throws Exception {
742 /* FileReader reader = new FileReader(new File (producerFilePath));
743 Properties props = new Properties();
744 props.load(reader);*/
745 latitude = props.getProperty("Latitude");
746 longitude = props.getProperty("Longitude");
747 version = props.getProperty("Version");
748 serviceName = props.getProperty("ServiceName");
749 env = props.getProperty("Environment");
750 partner = props.getProperty("Partner");
751 routeOffer = props.getProperty("routeOffer");
752 subContextPath = props.getProperty("SubContextPath")+fTopic;
753 /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
754 subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
756 protocol = props.getProperty("Protocol");
757 methodType = props.getProperty("MethodType");
758 dmeuser = props.getProperty("username");
759 dmepassword = props.getProperty("password");
760 contentType = props.getProperty("contenttype");
761 handlers = props.getProperty("sessionstickinessrequired");
762 routerFilePath= props.getProperty("DME2preferredRouterFilePath");
765 * Changes to DME2Client url to use Partner for auto failover between data centers
766 * When Partner value is not provided use the routeOffer value for auto failover within a cluster
770 String partitionKey = props.getProperty("partition");
772 if (partner != null && !partner.isEmpty() )
774 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
775 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
776 url = url + "&partitionKey=" + partitionKey;
779 else if (routeOffer!=null && !routeOffer.isEmpty())
781 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
782 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
783 url = url + "&partitionKey=" + partitionKey;
787 DMETimeOuts = new HashMap<String, String>();
788 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
789 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
790 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
791 DMETimeOuts.put("Content-Type", contentType);
792 System.setProperty("AFT_LATITUDE", latitude);
793 System.setProperty("AFT_LONGITUDE", longitude);
794 System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
795 //System.setProperty("DME2.DEBUG", "true");
796 // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
797 //System.out.println("XXXXXX"+url);
800 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
801 "SSLv3,TLSv1,TLSv1.1");
802 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
803 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
807 sender = new DME2Client(new URI(url), 5000L);
809 sender.setAllowAllHttpReturnCodes(true);
810 sender.setMethod(methodType);
811 sender.setSubContext(subContextPath);
812 sender.setCredentials(dmeuser, dmepassword);
813 sender.setHeaders(DMETimeOuts);
814 if(handlers.equalsIgnoreCase("yes")){
815 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
816 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
817 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
819 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
821 } catch (DME2Exception x) {
822 getLog().warn(x.getMessage(), x);
823 throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
824 } catch (URISyntaxException x) {
826 getLog().warn(x.getMessage(), x);
827 throw new URISyntaxException(url,x.getMessage());
828 } catch (Exception x) {
830 getLog().warn(x.getMessage(), x);
831 throw new Exception(x.getMessage());
835 private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
839 if ( topic == null || topic.length() < 1 )
841 throw new IllegalArgumentException ( "A topic must be provided." );
844 fHostSelector = new HostSelector(hosts, null);
847 fMaxBatchSize = maxBatchSize;
848 fMaxBatchAgeMs = maxBatchAgeMs;
849 fCompress = compress;
851 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
852 fDontSendUntilMs = 0;
853 fExec = new ScheduledThreadPoolExecutor ( 1 );
854 pubResponse = new MRPublisherResponse();
858 private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
862 if ( topic == null || topic.length() < 1 )
864 throw new IllegalArgumentException ( "A topic must be provided." );
867 fHostSelector = new HostSelector(hosts, null);
870 fMaxBatchSize = maxBatchSize;
871 fMaxBatchAgeMs = maxBatchAgeMs;
872 fCompress = compress;
873 threadOccuranceTime=httpThreadOccurnace;
874 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
875 fDontSendUntilMs = 0;
876 fExec = new ScheduledThreadPoolExecutor ( 1 );
877 fExec.scheduleAtFixedRate ( new Runnable()
884 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
887 private static class TimestampedMessage extends message
889 public TimestampedMessage ( message m )
892 timestamp = Clock.now();
894 public final long timestamp;
897 public String getUsername() {
901 public void setUsername(String username) {
902 this.username = username;
905 public String getPassword() {
909 public void setPassword(String password) {
910 this.password = password;
913 public String getHost() {
917 public void setHost(String host) {
921 public String getContentType() {
925 public void setContentType(String contentType) {
926 this.contentType = contentType;
929 public String getAuthKey() {
933 public void setAuthKey(String authKey) {
934 this.authKey = authKey;
937 public String getAuthDate() {
941 public void setAuthDate(String authDate) {
942 this.authDate = authDate;