1 /*******************************************************************************
 
   2  *  ============LICENSE_START=======================================================
 
   4  *  ================================================================================
 
   5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 
   6  *  ================================================================================
 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *  you may not use this file except in compliance with the License.
 
   9  *  You may obtain a copy of the License at
 
  10  *        http://www.apache.org/licenses/LICENSE-2.0
 
  12  *  Unless required by applicable law or agreed to in writing, software
 
  13  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  15  *  See the License for the specific language governing permissions and
 
  16  *  limitations under the License.
 
  17  *  ============LICENSE_END=========================================================
 
  19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
 
  21  *******************************************************************************/
 
  22 package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
 
  24 import java.io.ByteArrayOutputStream;
 
  25 import java.io.IOException;
 
  26 import java.io.OutputStream;
 
  27 import java.net.MalformedURLException;
 
  29 import java.net.URISyntaxException;
 
  30 import java.util.Collection;
 
  31 import java.util.HashMap;
 
  32 import java.util.LinkedList;
 
  33 import java.util.List;
 
  35 import java.util.Properties;
 
  36 import java.util.concurrent.LinkedBlockingQueue;
 
  37 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
  38 import java.util.concurrent.TimeUnit;
 
  39 import java.util.zip.GZIPOutputStream;
 
  41 import javax.ws.rs.core.MultivaluedMap;
 
  43 import org.apache.http.HttpException;
 
  44 import org.apache.http.HttpStatus;
 
  45 import org.json.JSONArray;
 
  46 import org.json.JSONObject;
 
  47 import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.HostSelector;
 
  48 import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
 
  49 import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
 
  50 import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
 
  52 import com.att.aft.dme2.api.DME2Client;
 
  53 import com.att.aft.dme2.api.DME2Exception;
 
  55 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
 
  57         public static class Builder 
 
  63                 public Builder againstUrls ( Collection<String> baseUrls )
 
  69                 public Builder onTopic ( String topic )
 
  75                 public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
 
  77                         fMaxBatchSize = maxBatchSize;
 
  78                         fMaxBatchAgeMs = maxBatchAgeMs;
 
  82                 public Builder compress ( boolean compress )
 
  88                 public Builder httpThreadTime ( int threadOccuranceTime )
 
  90                         this.threadOccuranceTime = threadOccuranceTime;
 
  94                 public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
 
  96                         fAllowSelfSignedCerts = allowSelfSignedCerts;
 
 100                 public Builder withResponse ( boolean withResponse)
 
 102                         fWithResponse = withResponse;
 
 105                 public MRSimplerBatchPublisher build ()
 
 110                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
 
 111                                 } catch (MalformedURLException e) {
 
 112                                         throw new RuntimeException(e);
 
 117                                         return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
 
 118                                 } catch (MalformedURLException e) {
 
 119                                         throw new RuntimeException(e);
 
 125                 private Collection<String> fUrls;
 
 126                 private String fTopic;
 
 127                 private int fMaxBatchSize = 100;
 
 128                 private long fMaxBatchAgeMs = 1000;
 
 129                 private boolean fCompress = false;
 
 130                 private int threadOccuranceTime = 50;
 
 131                 private boolean fAllowSelfSignedCerts = false;
 
 132                 private boolean fWithResponse = false;
 
 137         public int send ( String partition, String msg )
 
 139                 return send ( new message ( partition, msg ) );
 
 142         public int send ( String msg )
 
 144                 return send ( new message ( null, msg ) );
 
 149         public int send ( message msg )
 
 151                 final LinkedList<message> list = new LinkedList<message> ();
 
 153                 return send ( list );
 
 159         public synchronized int send ( Collection<message> msgs )
 
 163                         throw new IllegalStateException ( "The publisher was closed." );
 
 166                 for ( message userMsg : msgs )
 
 168                         fPending.add ( new TimestampedMessage ( userMsg ) );
 
 170                 return getPendingMessageCount ();
 
 174         public synchronized int getPendingMessageCount ()
 
 176                 return fPending.size ();
 
 184                         final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
 
 185                         if ( remains.size() > 0 )
 
 187                                 getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
 
 188                                         + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
 
 191                 catch ( InterruptedException e )
 
 193                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
 
 195                 catch ( IOException e )
 
 197                         getLog().warn ( "Possible message loss. " + e.getMessage(), e );
 
 202         public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
 
 204                 synchronized ( this )
 
 208                         // stop the background sender
 
 209                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
 
 210                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
 
 214                 final long now = Clock.now ();
 
 215                 final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
 
 216                 final long timeoutAtMs = now + waitInMs;
 
 218                 while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
 
 221                         Thread.sleep ( 250 );
 
 224                 synchronized ( this )
 
 226                         final LinkedList<message> result = new LinkedList<message> ();
 
 227                         fPending.drainTo ( result );
 
 233          * Possibly send a batch to the MR server. This is called by the background thread
 
 234          * and the close() method
 
 238         private synchronized void send ( boolean force )
 
 240                 if ( force || shouldSendNow () )
 
 244                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
 
 246                                 // note the time for back-off
 
 247                                 fDontSendUntilMs = sfWaitAfterError + Clock.now ();
 
 252         private synchronized boolean shouldSendNow ()
 
 254                 boolean shouldSend = false;
 
 255                 if ( fPending.size () > 0 )
 
 257                         final long nowMs = Clock.now ();
 
 259                         shouldSend = ( fPending.size() >= fMaxBatchSize );
 
 262                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
 
 263                                 shouldSend = sendAtMs <= nowMs;
 
 266                         // however, wait after an error
 
 267                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
 
 272         private synchronized boolean sendBatch ()
 
 274                 // it's possible for this call to be made with an empty list. in this case, just return.
 
 275                 if ( fPending.size() < 1 )
 
 280                 final long nowMs = Clock.now ();
 
 282                 host = this.fHostSelector.selectBaseHost();
 
 284                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
 
 289                         /*final String contentType =
 
 291                                         MRFormat.CAMBRIA_ZIP.toString () :
 
 292                                         MRFormat.CAMBRIA.toString () 
 
 295                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
 
 296                         OutputStream os = baseStream;
 
 297                         final String contentType = props.getProperty("contenttype");
 
 298                         if(contentType.equalsIgnoreCase("application/json")){
 
 299                                 JSONArray jsonArray = new JSONArray();
 
 300                                 for ( TimestampedMessage m : fPending )
 
 302                                         JSONObject jsonObject = new JSONObject(m.fMsg);
 
 304                                                 jsonArray.put(jsonObject);
 
 307                                 os.write (jsonArray.toString().getBytes() );    
 
 310                                 }else if (contentType.equalsIgnoreCase("text/plain")){
 
 311                                         for ( TimestampedMessage m : fPending )
 
 313                                                 os.write ( m.fMsg.getBytes() );
 
 317                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
 
 318                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
 
 320                                                 os = new GZIPOutputStream ( baseStream );
 
 322                                         for ( TimestampedMessage m : fPending )
 
 325                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
 
 327                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
 
 329                                                 os.write ( m.fPartition.getBytes() );
 
 330                                                 os.write ( m.fMsg.getBytes() );
 
 335                                         for ( TimestampedMessage m : fPending )
 
 337                                                 os.write ( m.fMsg.getBytes() );
 
 345                         final long startMs = Clock.now ();
 
 346                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
 
 352                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 353                                 sender.setPayload(os.toString());               
 
 354                                 String dmeResponse = sender.sendAndWait(5000L);
 
 356                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
 
 357                                                 + dmeResponse.toString();
 
 358                                 getLog().info(logLine);
 
 363                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
 
 364                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 365                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
 
 366                                 //System.out.println(result.getInt("status"));
 
 367                                 //Here we are checking for error response. If HTTP status
 
 368                                 //code is not within the http success response code
 
 369                                 //then we consider this as error and return false
 
 370                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
 
 373                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
 
 374                                 getLog().info(logLine);
 
 379                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
 
 380                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 381                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
 
 384                                 //System.out.println(result.getInt("status"));
 
 385                                 //Here we are checking for error response. If HTTP status
 
 386                                 //code is not within the http success response code
 
 387                                 //then we consider this as error and return false
 
 388                                 if(result.getInt("status") < 200 || result.getInt("status") > 299) {
 
 391                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
 
 392                                 getLog().info(logLine);
 
 397                 catch ( IllegalArgumentException x ) {
 
 398                         getLog().warn ( x.getMessage(), x );
 
 399                 } catch ( IOException x ) {
 
 400                         getLog().warn ( x.getMessage(), x );
 
 401                 } catch (HttpException x) {
 
 402                         getLog().warn ( x.getMessage(), x );
 
 403                 } catch (Exception x) {
 
 404                         getLog().warn(x.getMessage(), x);
 
 409         public synchronized MRPublisherResponse sendBatchWithResponse () 
 
 411                 // it's possible for this call to be made with an empty list. in this case, just return.
 
 412                 if ( fPending.size() < 1 )
 
 414                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 
 415                         pubResponse.setResponseMessage("No Messages to send");
 
 419                 final long nowMs = Clock.now ();
 
 421                 host = this.fHostSelector.selectBaseHost();
 
 423                 final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
 
 424                 OutputStream os=null;
 
 428                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
 
 430                         final String contentType = props.getProperty("contenttype");
 
 431                         if(contentType.equalsIgnoreCase("application/json")){
 
 432                                 JSONArray jsonArray = new JSONArray();
 
 433                                 for ( TimestampedMessage m : fPending )
 
 435                                         JSONObject jsonObject = new JSONObject(m.fMsg);
 
 437                                                 jsonArray.put(jsonObject);
 
 440                                 os.write (jsonArray.toString().getBytes() );    
 
 441                                 }else if (contentType.equalsIgnoreCase("text/plain")){
 
 442                                         for ( TimestampedMessage m : fPending )
 
 444                                                 os.write ( m.fMsg.getBytes() );
 
 447                                 } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
 
 448                                         if ( contentType.equalsIgnoreCase("application/cambria-zip") )
 
 450                                                 os = new GZIPOutputStream ( baseStream );
 
 452                                         for ( TimestampedMessage m : fPending )
 
 455                                                 os.write ( ( "" + m.fPartition.length () ).getBytes() );
 
 457                                                 os.write ( ( "" + m.fMsg.length () ).getBytes() );
 
 459                                                 os.write ( m.fPartition.getBytes() );
 
 460                                                 os.write ( m.fMsg.getBytes() );
 
 465                                         for ( TimestampedMessage m : fPending )
 
 467                                                 os.write ( m.fMsg.getBytes() );
 
 474                         final long startMs = Clock.now ();
 
 475                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
 
 482                                 getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 483                                 sender.setPayload(os.toString());               
 
 486                                 String dmeResponse = sender.sendAndWait(5000L);
 
 487                                 System.out.println("dmeres->"+dmeResponse);             
 
 490                                 pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
 
 492                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
 
 496                                 final String logLine = String.valueOf((Clock.now() - startMs))
 
 497                                                 + dmeResponse.toString();
 
 498                                 getLog().info(logLine);
 
 502                                 catch (DME2Exception x) {
 
 503                                         getLog().warn(x.getMessage(), x);
 
 504                                         pubResponse.setResponseCode(x.getErrorCode());
 
 505                                         pubResponse.setResponseMessage(x.getErrorMessage());
 
 506                                 } catch (URISyntaxException x) {
 
 508                                         getLog().warn(x.getMessage(), x);
 
 509                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 
 510                                         pubResponse.setResponseMessage(x.getMessage());
 
 511                                 } catch (Exception x) {
 
 513                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 
 514                                         pubResponse.setResponseMessage(x.getMessage());
 
 521                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
 
 522                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 523                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
 
 524                                 //System.out.println(result.getInt("status"));
 
 525                                 //Here we are checking for error response. If HTTP status
 
 526                                 //code is not within the http success response code
 
 527                                 //then we consider this as error and return false
 
 530                                 pubResponse = createMRPublisherResponse(result,pubResponse);
 
 532                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
 
 537                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
 
 538                                 getLog().info(logLine);
 
 543                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
 
 544                                 getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
 
 545                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
 
 547                                 //System.out.println(result.getInt("status"));
 
 548                                 //Here we are checking for error response. If HTTP status
 
 549                                 //code is not within the http success response code
 
 550                                 //then we consider this as error and return false
 
 551                                 pubResponse = createMRPublisherResponse(result,pubResponse);
 
 553                                 if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
 
 558                                 final String logLine = String.valueOf((Clock.now() - startMs));
 
 559                                 getLog().info(logLine);
 
 564                 catch ( IllegalArgumentException x ) {
 
 565                         getLog().warn ( x.getMessage(), x );
 
 566                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 
 567                         pubResponse.setResponseMessage(x.getMessage());
 
 569                 } catch ( IOException x ) {
 
 570                         getLog().warn ( x.getMessage(), x );
 
 571                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 
 572                         pubResponse.setResponseMessage(x.getMessage());
 
 574                 } catch (HttpException x) {
 
 575                         getLog().warn ( x.getMessage(), x );
 
 576                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 
 577                         pubResponse.setResponseMessage(x.getMessage());
 
 579                 } catch (Exception x) {
 
 580                         getLog().warn(x.getMessage(), x);
 
 582                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 
 583                         pubResponse.setResponseMessage(x.getMessage());
 
 588                         if (fPending.size()>0) {
 
 589                                 getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
 
 590                                 pubResponse.setPendingMsgs(fPending.size());
 
 595                                 } catch (Exception x) {
 
 596                                         getLog().warn(x.getMessage(), x);
 
 597                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 
 598                                         pubResponse.setResponseMessage("Error in closing Output Stream");
 
 606 private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
 
 611                  mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 
 612                  mrPubResponse.setResponseMessage("Please verify the Producer properties");
 
 614          else if(reply.startsWith("{"))
 
 616                         JSONObject jObject = new JSONObject(reply);
 
 617                         if(jObject.has("message") && jObject.has("status"))
 
 619                                 String message = jObject.getString("message");
 
 622                                         mrPubResponse.setResponseMessage(message);      
 
 624                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
 
 628                                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
 
 629                                         mrPubResponse.setResponseMessage(reply);        
 
 632          else if (reply.startsWith("<"))
 
 634                  String responseCode = getHTTPErrorResponseCode(reply);
 
 635                  if( responseCode.contains("403"))
 
 637                          responseCode = "403";
 
 639                  mrPubResponse.setResponseCode(responseCode);
 
 640                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));   
 
 643                 return mrPubResponse;
 
 646         private final String fTopic;
 
 647         private final int fMaxBatchSize;
 
 648         private final long fMaxBatchAgeMs;
 
 649         private final boolean fCompress;
 
 650         private int threadOccuranceTime;
 
 651         private boolean fClosed;
 
 652         private String username;
 
 653         private String password;
 
 657         private HostSelector fHostSelector = null;
 
 659         private final LinkedBlockingQueue<TimestampedMessage> fPending;
 
 660         private long fDontSendUntilMs;
 
 661         private final ScheduledThreadPoolExecutor fExec;
 
 663         private String latitude;
 
 664         private String longitude;
 
 665         private String version;
 
 666         private String serviceName;
 
 668         private String partner;
 
 669         private String routeOffer;
 
 670         private String subContextPath;
 
 671         private String protocol;
 
 672         private String methodType;
 
 674         private String dmeuser;
 
 675         private String dmepassword;
 
 676         private String contentType;
 
 677         private static final long sfWaitAfterError = 10000;
 
 678         private HashMap<String, String> DMETimeOuts;
 
 679         private DME2Client sender;
 
 680         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
 
 681         public String producerFilePath;
 
 682         private String authKey;
 
 683         private String authDate;
 
 684         private String handlers;
 
 685         private Properties props;
 
 686         public static String routerFilePath;
 
 687         public static Map<String, String> headers=new HashMap<String, String>();
 
 688         public static MultivaluedMap<String, Object> headersMap;
 
 691         private MRPublisherResponse pubResponse;
 
 693         public MRPublisherResponse getPubResponse() {
 
 696         public void setPubResponse(MRPublisherResponse pubResponse) {
 
 697                 this.pubResponse = pubResponse;
 
 700         public static String getRouterFilePath() {
 
 701                 return routerFilePath;
 
 704         public static void setRouterFilePath(String routerFilePath) {
 
 705                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
 
 708         public Properties getProps() {
 
 712         public void setProps(Properties props) {
 
 716         public String getProducerFilePath() {
 
 717                 return producerFilePath;
 
 720         public void setProducerFilePath(String producerFilePath) {
 
 721                 this.producerFilePath = producerFilePath;
 
 724         public String getProtocolFlag() {
 
 728         public void setProtocolFlag(String protocolFlag) {
 
 729                 this.protocolFlag = protocolFlag;
 
 733         private void DME2Configue() throws Exception {
 
 736                 /*      FileReader reader = new FileReader(new File (producerFilePath));
 
 737                         Properties props = new Properties();            
 
 738                         props.load(reader);*/
 
 739                         latitude = props.getProperty("Latitude");
 
 740                         longitude = props.getProperty("Longitude");
 
 741                         version = props.getProperty("Version");
 
 742                         serviceName = props.getProperty("ServiceName");
 
 743                         env = props.getProperty("Environment");
 
 744                         partner = props.getProperty("Partner");
 
 745                         routeOffer = props.getProperty("routeOffer");
 
 746                         subContextPath = props.getProperty("SubContextPath")+fTopic;
 
 747                         /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
 
 748                                 subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
 
 750                         protocol = props.getProperty("Protocol");
 
 751                         methodType = props.getProperty("MethodType");
 
 752                         dmeuser = props.getProperty("username");
 
 753                         dmepassword = props.getProperty("password");
 
 754                         contentType = props.getProperty("contenttype");
 
 755                         handlers = props.getProperty("sessionstickinessrequired");
 
 756                         routerFilePath= props.getProperty("DME2preferredRouterFilePath");
 
 759                          * Changes to DME2Client url to use Partner for auto failover between data centers
 
 760                          * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
 
 764                         String partitionKey = props.getProperty("partition");
 
 766                         if (partner != null && !partner.isEmpty() ) 
 
 768                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
 
 769                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
 
 770                     url = url + "&partitionKey=" + partitionKey;
 
 773                         else if (routeOffer!=null && !routeOffer.isEmpty()) 
 
 775                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
 
 776                 if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
 
 777                     url = url + "&partitionKey=" + partitionKey;
 
 781                         DMETimeOuts = new HashMap<String, String>();
 
 782                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
 
 783                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
 
 784                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
 
 785                         DMETimeOuts.put("Content-Type", contentType);
 
 786                         System.setProperty("AFT_LATITUDE", latitude);
 
 787                         System.setProperty("AFT_LONGITUDE", longitude);
 
 788                         System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
 
 789                         //System.setProperty("DME2.DEBUG", "true");
 
 790                 //      System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
 
 791                         //System.out.println("XXXXXX"+url);
 
 794                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
 
 795                                         "SSLv3,TLSv1,TLSv1.1");
 
 796                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
 
 797                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
 
 801                         sender = new DME2Client(new URI(url), 5000L);
 
 803                         sender.setAllowAllHttpReturnCodes(true);
 
 804                         sender.setMethod(methodType);
 
 805                         sender.setSubContext(subContextPath);   
 
 806                         sender.setCredentials(dmeuser, dmepassword);
 
 807                         sender.setHeaders(DMETimeOuts);
 
 808                         if(handlers.equalsIgnoreCase("yes")){
 
 809                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
 
 810                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
 
 811                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
 
 813                                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler");
 
 815                 } catch (DME2Exception x) {
 
 816                         getLog().warn(x.getMessage(), x);
 
 817                         throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
 
 818                 } catch (URISyntaxException x) {
 
 820                         getLog().warn(x.getMessage(), x);
 
 821                         throw new URISyntaxException(url,x.getMessage());
 
 822                 } catch (Exception x) {
 
 824                         getLog().warn(x.getMessage(), x);
 
 825                         throw new Exception(x.getMessage());
 
 829         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
 
 833                 if ( topic == null || topic.length() < 1 )
 
 835                         throw new IllegalArgumentException ( "A topic must be provided." );
 
 838                 fHostSelector = new HostSelector(hosts, null);
 
 841                 fMaxBatchSize = maxBatchSize;
 
 842                 fMaxBatchAgeMs = maxBatchAgeMs;
 
 843                 fCompress = compress;
 
 845                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
 
 846                 fDontSendUntilMs = 0;
 
 847                 fExec = new ScheduledThreadPoolExecutor ( 1 );
 
 848                 pubResponse = new MRPublisherResponse();
 
 852         private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
 
 856                 if ( topic == null || topic.length() < 1 )
 
 858                         throw new IllegalArgumentException ( "A topic must be provided." );
 
 861                 fHostSelector = new HostSelector(hosts, null);
 
 864                 fMaxBatchSize = maxBatchSize;
 
 865                 fMaxBatchAgeMs = maxBatchAgeMs;
 
 866                 fCompress = compress;
 
 867                 threadOccuranceTime=httpThreadOccurnace;
 
 868                 fPending = new LinkedBlockingQueue<TimestampedMessage> ();
 
 869                 fDontSendUntilMs = 0;
 
 870                 fExec = new ScheduledThreadPoolExecutor ( 1 );
 
 871                 fExec.scheduleAtFixedRate ( new Runnable()
 
 878                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
 
 881         private static class TimestampedMessage extends message
 
 883                 public TimestampedMessage ( message m )
 
 886                         timestamp = Clock.now();
 
 888                 public final long timestamp;
 
 891         public String getUsername() {
 
 895         public void setUsername(String username) {
 
 896                 this.username = username;
 
 899         public String getPassword() {
 
 903         public void setPassword(String password) {
 
 904                 this.password = password;
 
 907         public String getHost() {
 
 911         public void setHost(String host) {
 
 915         public String getContentType() {
 
 919         public void setContentType(String contentType) {
 
 920                 this.contentType = contentType;
 
 923         public String getAuthKey() {
 
 927         public void setAuthKey(String authKey) {
 
 928                 this.authKey = authKey;
 
 931         public String getAuthDate() {
 
 935         public void setAuthDate(String authDate) {
 
 936                 this.authDate = authDate;