Sonar major issues
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
index ad925b1..ef0d13a 100644 (file)
@@ -40,10 +40,14 @@ import java.util.zip.GZIPOutputStream;
 
 import javax.ws.rs.core.MultivaluedMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.http.HttpException;
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
 import org.json.JSONObject;
+import org.json.JSONTokener;
 
 import com.att.aft.dme2.api.DME2Client;
 import com.att.aft.dme2.api.DME2Exception;
@@ -52,77 +56,79 @@ import com.att.nsa.mr.client.MRBatchingPublisher;
 import com.att.nsa.mr.client.response.MRPublisherResponse;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
-{
-       public static class Builder 
-       {
-               public Builder ()
-               {
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
+       private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
+
+       public static class Builder {
+               public Builder() {
                }
 
-               public Builder againstUrls ( Collection<String> baseUrls )
-               {
+               public Builder againstUrls(Collection<String> baseUrls) {
                        fUrls = baseUrls;
                        return this;
                }
+               
+               public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
+               {               
+                       fUrls = baseUrls;               
+                       fServiceName = serviceName;             
+                       fTransportype = transportype;           
+                       return this;            
+               }
 
-               public Builder onTopic ( String topic )
-               {
+               public Builder onTopic(String topic) {
                        fTopic = topic;
                        return this;
                }
 
-               public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
-               {
+               public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
                        fMaxBatchSize = maxBatchSize;
                        fMaxBatchAgeMs = maxBatchAgeMs;
                        return this;
                }
 
-               public Builder compress ( boolean compress )
-               {
+               public Builder compress(boolean compress) {
                        fCompress = compress;
                        return this;
                }
-               
-               public Builder httpThreadTime ( int threadOccuranceTime )
-               {
+
+               public Builder httpThreadTime(int threadOccuranceTime) {
                        this.threadOccuranceTime = threadOccuranceTime;
                        return this;
                }
-               
-               public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
-               {
+
+               public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
                        fAllowSelfSignedCerts = allowSelfSignedCerts;
                        return this;
                }
-               
-               public Builder withResponse ( boolean withResponse)
-               {
+
+               public Builder withResponse(boolean withResponse) {
                        fWithResponse = withResponse;
                        return this;
                }
-               public MRSimplerBatchPublisher build ()
-               {
-                       if(!fWithResponse) 
-                       {
+
+               public MRSimplerBatchPublisher build() {
+                       if (!fWithResponse) {
                                try {
-                                       return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+                                       return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+                                                       fAllowSelfSignedCerts, threadOccuranceTime);
                                } catch (MalformedURLException e) {
-                                       throw new RuntimeException(e);
+                                       throw new IllegalArgumentException(e);
                                }
-                       } else 
-                       {
+                       } else {
                                try {
-                                       return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+                                       return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+                                                       fAllowSelfSignedCerts, fMaxBatchSize);
                                } catch (MalformedURLException e) {
-                                       throw new RuntimeException(e);
+                                       throw new IllegalArgumentException(e);
                                }
                        }
-                               
+
                }
 
                private Collection<String> fUrls;
+               private Collection<String> fServiceName;                
+               private String fTransportype;   
                private String fTopic;
                private int fMaxBatchSize = 100;
                private long fMaxBatchAgeMs = 1000;
@@ -130,262 +136,249 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                private int threadOccuranceTime = 50;
                private boolean fAllowSelfSignedCerts = false;
                private boolean fWithResponse = false;
-               
+
        };
 
        @Override
-       public int send ( String partition, String msg )
-       {
-               return send ( new message ( partition, msg ) );
+       public int send(String partition, String msg) {
+               return send(new message(partition, msg));
        }
+
        @Override
-       public int send ( String msg )
-       {
-               return send ( new message ( null, msg ) );
+       public int send(String msg) {
+               return send(new message(null, msg));
        }
 
-
        @Override
-       public int send ( message msg )
-       {
-               final LinkedList<message> list = new LinkedList<message> ();
-               list.add ( msg );
-               return send ( list );
+       public int send(message msg) {
+               final LinkedList<message> list = new LinkedList<message>();
+               list.add(msg);
+               return send(list);
        }
-       
-       
 
        @Override
-       public synchronized int send ( Collection<message> msgs )
-       {
-               if ( fClosed )
-               {
-                       throw new IllegalStateException ( "The publisher was closed." );
+       public synchronized int send(Collection<message> msgs) {
+               if (fClosed) {
+                       throw new IllegalStateException("The publisher was closed.");
                }
-               
-               for ( message userMsg : msgs )
-               {
-                       fPending.add ( new TimestampedMessage ( userMsg ) );
+
+               for (message userMsg : msgs) {
+                       fPending.add(new TimestampedMessage(userMsg));
                }
-               return getPendingMessageCount ();
+               return getPendingMessageCount();
        }
 
        @Override
-       public synchronized int getPendingMessageCount ()
-       {
-               return fPending.size ();
+       public synchronized int getPendingMessageCount() {
+               return fPending.size();
        }
 
        @Override
-       public void close ()
-       {
-               try
-               {
-                       final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
-                       if ( remains.size() > 0 )
-                       {
-                               getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
-                                       + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+       public void close() {
+               try {
+                       final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+                       if (remains.isEmpty()) {
+                               getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+                                               + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
                        }
-               }
-               catch ( InterruptedException e )
-               {
-                       getLog().warn ( "Possible message loss. " + e.getMessage(), e );
-               }
-               catch ( IOException e )
-               {
-                       getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+               } catch (InterruptedException e) {
+                       getLog().warn("Possible message loss. " + e.getMessage(), e);
+                       Thread.currentThread().interrupt();
+               } catch (IOException e) {
+                       getLog().warn("Possible message loss. " + e.getMessage(), e);
                }
        }
 
        @Override
-       public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
-       {
-               synchronized ( this )
-               {
+       public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+               synchronized (this) {
                        fClosed = true;
 
                        // stop the background sender
-                       fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
-                       fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
-                       fExec.shutdown ();
+                       fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+                       fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+                       fExec.shutdown();
                }
 
-               final long now = Clock.now ();
-               final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+               final long now = Clock.now();
+               final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
                final long timeoutAtMs = now + waitInMs;
 
-               while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
-               {
-                       send ( true );
-                       Thread.sleep ( 250 );
+               while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+                       send(true);
+                       Thread.sleep(250);
                }
 
-               synchronized ( this )
-               {
-                       final LinkedList<message> result = new LinkedList<message> ();
-                       fPending.drainTo ( result );
+               synchronized (this) {
+                       final LinkedList<message> result = new LinkedList<message>();
+                       fPending.drainTo(result);
                        return result;
                }
        }
 
        /**
-        * Possibly send a batch to the MR server. This is called by the background thread
-        * and the close() method
+        * Possibly send a batch to the MR server. This is called by the background
+        * thread and the close() method
         * 
         * @param force
         */
-       private synchronized void send ( boolean force )
-       {
-               if ( force || shouldSendNow () )
-               {
-                       if ( !sendBatch () )
-                       {
-                               getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+       private synchronized void send(boolean force) {
+               if (force || shouldSendNow()) {
+                       if (!sendBatch()) {
+                               getLog().warn("Send failed, " + fPending.size() + " message to send.");
 
                                // note the time for back-off
-                               fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+                               fDontSendUntilMs = sfWaitAfterError + Clock.now();
                        }
                }
        }
 
-       private synchronized boolean shouldSendNow ()
-       {
+       private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if ( fPending.size () > 0 )
-               {
-                       final long nowMs = Clock.now ();
+               if (fPending.isEmpty()) {
+                       final long nowMs = Clock.now();
 
-                       shouldSend = ( fPending.size() >= fMaxBatchSize );
-                       if ( !shouldSend )
-                       {
+                       shouldSend = (fPending.size() >= fMaxBatchSize);
+                       if (!shouldSend) {
                                final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
                                shouldSend = sendAtMs <= nowMs;
                        }
 
                        // however, wait after an error
-                       shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
+                       shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
                }
                return shouldSend;
        }
 
-       private synchronized boolean sendBatch ()
-       {
-               // it's possible for this call to be made with an empty list. in this case, just return.
-               if ( fPending.size() < 1 )
-               {
+       /**
+        * Method to parse published JSON Objects and Arrays
+        * 
+        * @return JSONArray
+        */
+       private JSONArray parseJSON() {
+               JSONArray jsonArray = new JSONArray();
+               for (TimestampedMessage m : fPending) {
+                       JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+                       JSONObject jsonObject = null;
+                       JSONArray tempjsonArray = null;
+                       final char firstChar = jsonTokener.next();
+                       jsonTokener.back();
+                       if ('[' == firstChar) {
+                               tempjsonArray = new JSONArray(jsonTokener);
+                               if (null != tempjsonArray) {
+                                       for (int i = 0; i < tempjsonArray.length(); i++) {
+                                               jsonArray.put(tempjsonArray.getJSONObject(i));
+                                       }
+                               }
+                       } else {
+                               jsonObject = new JSONObject(jsonTokener);
+                               jsonArray.put(jsonObject);
+                       }
+
+               }
+               return jsonArray;
+       }
+
+       private synchronized boolean sendBatch() {
+               // it's possible for this call to be made with an empty list. in this
+               // case, just return.
+               if (fPending.size() < 1) {
                        return true;
                }
 
-               final long nowMs = Clock.now ();
-               
-               host = this.fHostSelector.selectBaseHost();
-               
-               final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-               
+               final long nowMs = Clock.now();
+
+               if (this.fHostSelector != null) {
+                       host = this.fHostSelector.selectBaseHost();
+               }
+
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+                               props.getProperty("partition"));
 
-               try
-               {
-                       /*final String contentType =
-                               fCompress ?
-                                       MRFormat.CAMBRIA_ZIP.toString () :
-                                       MRFormat.CAMBRIA.toString () 
-                       ;*/
-            
-                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+               try {
+                       
+                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
                        OutputStream os = baseStream;
                        final String contentType = props.getProperty("contenttype");
-                       if(contentType.equalsIgnoreCase("application/json")){
-                               JSONArray jsonArray = new JSONArray();
-                               for ( TimestampedMessage m : fPending )
-                               {
-                                       JSONObject jsonObject = new JSONObject(m.fMsg);
-                                                               
-                                               jsonArray.put(jsonObject);
-                               
+                       if (contentType.equalsIgnoreCase("application/json")) {
+                               JSONArray jsonArray = parseJSON();
+                               os.write(jsonArray.toString().getBytes());
+                               os.close();
+
+                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
                                }
-                               os.write (jsonArray.toString().getBytes() );    
                                os.close();
+                       } else if (contentType.equalsIgnoreCase("application/cambria")
+                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                                       os = new GZIPOutputStream(baseStream);
+                               }
+                               for (TimestampedMessage m : fPending) {
+
+                                       os.write(("" + m.fPartition.length()).getBytes());
+                                       os.write('.');
+                                       os.write(("" + m.fMsg.length()).getBytes());
+                                       os.write('.');
+                                       os.write(m.fPartition.getBytes());
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
+                               }
+                               os.close();
+                       } else {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
 
-                               }else if (contentType.equalsIgnoreCase("text/plain")){
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-                                       if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-                                       {
-                                               os = new GZIPOutputStream ( baseStream );
-                                       }
-                                       for ( TimestampedMessage m : fPending )
-                                       {
-                                               
-                                               os.write ( ( "" + m.fPartition.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( ( "" + m.fMsg.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( m.fPartition.getBytes() );
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               }else{
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                       
-                                       }
-                                       os.close ();
                                }
-             
-               
+                               os.close();
+                       }
 
-                       final long startMs = Clock.now ();
+                       final long startMs = Clock.now();
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               
-                       
+
                                DME2Configue();
-                               
+
                                Thread.sleep(5);
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               sender.setPayload(os.toString());               
+                               getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               sender.setPayload(os.toString());
                                String dmeResponse = sender.sendAndWait(5000L);
-                               
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
-                                               + dmeResponse.toString();
+
+                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return true;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+                                               username, password, protocolFlag);
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
                                        return false;
                                }
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return true;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-                               
-                               
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+                                               protocolFlag);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
                                        return false;
                                }
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
@@ -393,118 +386,117 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return true;
                        }
-               }
-               catch ( IllegalArgumentException x ) {
-                       getLog().warn ( x.getMessage(), x );
-               } catch ( IOException x ) {
-                       getLog().warn ( x.getMessage(), x );
+                       
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                                       return false;
+                               }
+                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+                               getLog().info(logLine);
+                               fPending.clear();
+                               return true;
+                       }
+               } catch (IllegalArgumentException x) {
+                       getLog().warn(x.getMessage(), x);
+               } catch (IOException x) {
+                       getLog().warn(x.getMessage(), x);
                } catch (HttpException x) {
-                       getLog().warn ( x.getMessage(), x );
+                       getLog().warn(x.getMessage(), x);
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
                }
                return false;
        }
 
-       public synchronized MRPublisherResponse sendBatchWithResponse () 
-       {
-               // it's possible for this call to be made with an empty list. in this case, just return.
-               if ( fPending.size() < 1 )
-               {
+       public synchronized MRPublisherResponse sendBatchWithResponse() {
+               // it's possible for this call to be made with an empty list. in this
+               // case, just return.
+               if (fPending.size() < 1) {
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage("No Messages to send");
                        return pubResponse;
                }
 
-               final long nowMs = Clock.now ();
-               
+               final long nowMs = Clock.now();
+
                host = this.fHostSelector.selectBaseHost();
-               
-               final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-               OutputStream os=null;
-               try
-               {
-                       
-                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
-                        os = baseStream;
+
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+                               props.getProperty("partition"));
+               OutputStream os = null;
+               try {
+
+                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+                       os = baseStream;
                        final String contentType = props.getProperty("contenttype");
-                       if(contentType.equalsIgnoreCase("application/json")){
-                               JSONArray jsonArray = new JSONArray();
-                               for ( TimestampedMessage m : fPending )
-                               {
-                                       JSONObject jsonObject = new JSONObject(m.fMsg);
-                                                               
-                                               jsonArray.put(jsonObject);
-                               
+                       if (contentType.equalsIgnoreCase("application/json")) {
+                               JSONArray jsonArray = parseJSON();
+                               os.write(jsonArray.toString().getBytes());
+                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
                                }
-                               os.write (jsonArray.toString().getBytes() );    
-                               }else if (contentType.equalsIgnoreCase("text/plain")){
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                               } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-                                       if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-                                       {
-                                               os = new GZIPOutputStream ( baseStream );
-                                       }
-                                       for ( TimestampedMessage m : fPending )
-                                       {
-                                               
-                                               os.write ( ( "" + m.fPartition.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( ( "" + m.fMsg.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( m.fPartition.getBytes() );
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               }else{
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                       
-                                       }
+                       } else if (contentType.equalsIgnoreCase("application/cambria")
+                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                                       os = new GZIPOutputStream(baseStream);
                                }
-             
-               
+                               for (TimestampedMessage m : fPending) {
+
+                                       os.write(("" + m.fPartition.length()).getBytes());
+                                       os.write('.');
+                                       os.write(("" + m.fMsg.length()).getBytes());
+                                       os.write('.');
+                                       os.write(m.fPartition.getBytes());
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
+                               }
+                               os.close();
+                       } else {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
+
+                               }
+                       }
 
-                       final long startMs = Clock.now ();
+                       final long startMs = Clock.now();
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               
-                       
+
                                try {
-                               DME2Configue();
-                               
-                               Thread.sleep(5);
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               sender.setPayload(os.toString());               
-                                               
-                               
-                               String dmeResponse = sender.sendAndWait(5000L);
-                               System.out.println("dmeres->"+dmeResponse);             
-                               
-                               
-                               pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
-                                       return pubResponse;
-                               }
-                               final String logLine = String.valueOf((Clock.now() - startMs))
-                                               + dmeResponse.toString();
-                               getLog().info(logLine);
-                               fPending.clear();
-                               
-                               }
-                               catch (DME2Exception x) {
+                                       DME2Configue();
+
+                                       Thread.sleep(5);
+                                       getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+                                                       + (nowMs - fPending.peek().timestamp) + " ms");
+                                       sender.setPayload(os.toString());
+
+                                       String dmeResponse = sender.sendAndWait(5000L);
+
+                                       pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+                                       if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                                       || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+                                               return pubResponse;
+                                       }
+                                       final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+                                       getLog().info(logLine);
+                                       fPending.clear();
+
+                               } catch (DME2Exception x) {
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(x.getErrorCode());
                                        pubResponse.setResponseMessage(x.getErrorMessage());
                                } catch (URISyntaxException x) {
-                                       
+
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                                        pubResponse.setResponseMessage(x.getMessage());
@@ -512,134 +504,149 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                                        pubResponse.setResponseMessage(x.getMessage());
-                                       
+                                       logger.error("exception: ", x);
+
                                }
-                               
+
                                return pubResponse;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               
-                               
-                               pubResponse = createMRPublisherResponse(result,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+                                               authDate, username, password, protocolFlag);
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
                                        return pubResponse;
                                }
-                               
+
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return pubResponse;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-                               
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               pubResponse = createMRPublisherResponse(result,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+                                               password, protocolFlag);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
                                        return pubResponse;
                                }
-                               
+
                                final String logLine = String.valueOf((Clock.now() - startMs));
                                getLog().info(logLine);
                                fPending.clear();
                                return pubResponse;
                        }
-               }
-               catch ( IllegalArgumentException x ) {
-                       getLog().warn ( x.getMessage(), x );
+                       
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+                                       return pubResponse;
+                               }
+
+                               final String logLine = String.valueOf((Clock.now() - startMs));
+                               getLog().info(logLine);
+                               fPending.clear();
+                               return pubResponse;
+                       }
+               } catch (IllegalArgumentException x) {
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
-               } catch ( IOException x ) {
-                       getLog().warn ( x.getMessage(), x );
+
+               } catch (IOException x) {
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                } catch (HttpException x) {
-                       getLog().warn ( x.getMessage(), x );
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
-                       
+
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                }
-               
+
                finally {
-                       if (fPending.size()>0) {
-                               getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+                       if (fPending.size() > 0) {
+                               getLog().warn("Send failed, " + fPending.size() + " message to send.");
                                pubResponse.setPendingMsgs(fPending.size());
                        }
                        if (os != null) {
                                try {
-                               os.close();
+                                       os.close();
                                } catch (Exception x) {
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                                        pubResponse.setResponseMessage("Error in closing Output Stream");
                                }
-                               }
+                       }
                }
-               
+
                return pubResponse;
        }
-       
-private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-               
-        if (reply.isEmpty()) 
-        {
-                
-                mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
-                mrPubResponse.setResponseMessage("Please verify the Producer properties");
-        }
-        else if(reply.startsWith("{"))
-        {
+
+       public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+               if (reply.isEmpty()) {
+
+                       mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+                       mrPubResponse.setResponseMessage("Please verify the Producer properties");
+               } else if (reply.startsWith("{")) {
                        JSONObject jObject = new JSONObject(reply);
-                       if(jObject.has("message") && jObject.has("status"))
-                       {
+                       if (jObject.has("message") && jObject.has("status")) {
                                String message = jObject.getString("message");
-                               if(null != message)
-                               {
-                                       mrPubResponse.setResponseMessage(message);      
+                               if (null != message) {
+                                       mrPubResponse.setResponseMessage(message);
                                }
                                mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+                       } else {
+                               mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+                               mrPubResponse.setResponseMessage(reply);
+                       }
+               } else if (reply.startsWith("<")) {
+                       String responseCode = getHTTPErrorResponseCode(reply);
+                       if (responseCode.contains("403")) {
+                               responseCode = "403";
                        }
-                       else
-                        {
-                                       mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-                                       mrPubResponse.setResponseMessage(reply);        
-                        }
-     }
-        else if (reply.startsWith("<"))
-        {
-                String responseCode = getHTTPErrorResponseCode(reply);
-                if( responseCode.contains("403"))
-                       {
-                        responseCode = "403";
-                       }       
-                mrPubResponse.setResponseCode(responseCode);
-                       mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));   
-        }
-        
+                       mrPubResponse.setResponseCode(responseCode);
+                       mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+               }
+
                return mrPubResponse;
        }
 
@@ -652,10 +659,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        private String username;
        private String password;
        private String host;
-       
-       //host selector
+
+       // host selector
        private HostSelector fHostSelector = null;
-       
+
        private final LinkedBlockingQueue<TimestampedMessage> fPending;
        private long fDontSendUntilMs;
        private final ScheduledThreadPoolExecutor fExec;
@@ -678,25 +685,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        private HashMap<String, String> DMETimeOuts;
        private DME2Client sender;
        public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-       public String producerFilePath;
        private String authKey;
        private String authDate;
        private String handlers;
        private Properties props;
        public static String routerFilePath;
-       public static Map<String, String> headers=new HashMap<String, String>();
+       protected static final Map<String, String> headers = new HashMap<String, String>();
        public static MultivaluedMap<String, Object> headersMap;
-       
-       
+
        private MRPublisherResponse pubResponse;
-       
+
        public MRPublisherResponse getPubResponse() {
                return pubResponse;
        }
+
        public void setPubResponse(MRPublisherResponse pubResponse) {
                this.pubResponse = pubResponse;
        }
-       
+
        public static String getRouterFilePath() {
                return routerFilePath;
        }
@@ -713,14 +719,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                this.props = props;
        }
 
-       public String getProducerFilePath() {
-               return producerFilePath;
-       }
-
-       public void setProducerFilePath(String producerFilePath) {
-               this.producerFilePath = producerFilePath;
-       }
-
        public String getProtocolFlag() {
                return protocolFlag;
        }
@@ -728,14 +726,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        public void setProtocolFlag(String protocolFlag) {
                this.protocolFlag = protocolFlag;
        }
-       
-       
+
        private void DME2Configue() throws Exception {
                try {
-                       
-               /*      FileReader reader = new FileReader(new File (producerFilePath));
-                       Properties props = new Properties();            
-                       props.load(reader);*/
+
                        latitude = props.getProperty("Latitude");
                        longitude = props.getProperty("Longitude");
                        version = props.getProperty("Version");
@@ -743,41 +737,38 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                        env = props.getProperty("Environment");
                        partner = props.getProperty("Partner");
                        routeOffer = props.getProperty("routeOffer");
-                       subContextPath = props.getProperty("SubContextPath")+fTopic;
-                       /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
-                               subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
-                       }*/                     
+                       subContextPath = props.getProperty("SubContextPath") + fTopic;
+                       
                        protocol = props.getProperty("Protocol");
                        methodType = props.getProperty("MethodType");
                        dmeuser = props.getProperty("username");
                        dmepassword = props.getProperty("password");
                        contentType = props.getProperty("contenttype");
                        handlers = props.getProperty("sessionstickinessrequired");
-                       routerFilePath= props.getProperty("DME2preferredRouterFilePath");
-                       
+                       routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
                        /**
-                        * Changes to DME2Client url to use Partner for auto failover between data centers
-                        * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
+                        * Changes to DME2Client url to use Partner for auto failover
+                        * between data centers When Partner value is not provided use the
+                        * routeOffer value for auto failover within a cluster
                         */
-                       
 
                        String partitionKey = props.getProperty("partition");
-                       
-                       if (partner != null && !partner.isEmpty() ) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
-                       }
-                       else if (routeOffer!=null && !routeOffer.isEmpty()) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
+
+                       if (partner != null && !partner.isEmpty()) {
+                               url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+                                               + partner;
+                               if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+                                       url = url + "&partitionKey=" + partitionKey;
+                               }
+                       } else if (routeOffer != null && !routeOffer.isEmpty()) {
+                               url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+                                               + routeOffer;
+                               if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+                                       url = url + "&partitionKey=" + partitionKey;
+                               }
                        }
-                        
+
                        DMETimeOuts = new HashMap<String, String>();
                        DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
                        DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
@@ -785,56 +776,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                        DMETimeOuts.put("Content-Type", contentType);
                        System.setProperty("AFT_LATITUDE", latitude);
                        System.setProperty("AFT_LONGITUDE", longitude);
-                       System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
-                       //System.setProperty("DME2.DEBUG", "true");
-               //      System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
-                       //System.out.println("XXXXXX"+url);
+                       System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+                       // System.setProperty("DME2.DEBUG", "true");
+
+                       // SSL changes
+                       // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
                        
-                       //SSL changes
-                       System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-                                       "SSLv3,TLSv1,TLSv1.1");
+                       System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
                        System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
                        System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-                       
-                       //SSL changes
-                       
+
+                       // SSL changes
+
                        sender = new DME2Client(new URI(url), 5000L);
-                               
+
                        sender.setAllowAllHttpReturnCodes(true);
                        sender.setMethod(methodType);
-                       sender.setSubContext(subContextPath);   
+                       sender.setSubContext(subContextPath);
                        sender.setCredentials(dmeuser, dmepassword);
                        sender.setHeaders(DMETimeOuts);
-                       if(handlers.equalsIgnoreCase("yes")){
-                               sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+                       if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
+                               sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+                                               props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+                                               props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
                                sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-                               }else{
-                                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-                               }
+                       } else {
+                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+                       }
                } catch (DME2Exception x) {
                        getLog().warn(x.getMessage(), x);
-                       throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+                       throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
                } catch (URISyntaxException x) {
-                       
+
                        getLog().warn(x.getMessage(), x);
-                       throw new URISyntaxException(url,x.getMessage());
+                       throw new URISyntaxException(url, x.getMessage());
                } catch (Exception x) {
 
                        getLog().warn(x.getMessage(), x);
-                       throw new Exception(x.getMessage());
+                       throw new IllegalArgumentException(x.getMessage());
                }
        }
-       
-       private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
-       {
-               super ( hosts );
 
-               if ( topic == null || topic.length() < 1 )
-               {
-                       throw new IllegalArgumentException ( "A topic must be provided." );
+       private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+                       boolean compress) throws MalformedURLException {
+               super(hosts);
+
+               if (topic == null || topic.length() < 1) {
+                       throw new IllegalArgumentException("A topic must be provided.");
                }
-               
+
                fHostSelector = new HostSelector(hosts, null);
                fClosed = false;
                fTopic = topic;
@@ -842,49 +833,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
 
-               fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+               fPending = new LinkedBlockingQueue<TimestampedMessage>();
                fDontSendUntilMs = 0;
-               fExec = new ScheduledThreadPoolExecutor ( 1 );
+               fExec = new ScheduledThreadPoolExecutor(1);
                pubResponse = new MRPublisherResponse();
-               
+
        }
-       
-       private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
-       {
-               super ( hosts );
 
-               if ( topic == null || topic.length() < 1 )
-               {
-                       throw new IllegalArgumentException ( "A topic must be provided." );
+       private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+                       boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+               super(hosts);
+
+               if (topic == null || topic.length() < 1) {
+                       throw new IllegalArgumentException("A topic must be provided.");
                }
-               
+
                fHostSelector = new HostSelector(hosts, null);
                fClosed = false;
                fTopic = topic;
                fMaxBatchSize = maxBatchSize;
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
-               threadOccuranceTime=httpThreadOccurnace;
-               fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+               threadOccuranceTime = httpThreadOccurnace;
+               fPending = new LinkedBlockingQueue<TimestampedMessage>();
                fDontSendUntilMs = 0;
-               fExec = new ScheduledThreadPoolExecutor ( 1 );
-               fExec.scheduleAtFixedRate ( new Runnable()
-               {
+               fExec = new ScheduledThreadPoolExecutor(1);
+               fExec.scheduleAtFixedRate(new Runnable() {
                        @Override
-                       public void run ()
-                       {
-                               send ( false );
+                       public void run() {
+                               send(false);
                        }
-               }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+               }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
        }
 
-       private static class TimestampedMessage extends message
-       {
-               public TimestampedMessage ( message m )
-               {
-                       super ( m );
+       private static class TimestampedMessage extends message {
+               public TimestampedMessage(message m) {
+                       super(m);
                        timestamp = Clock.now();
                }
+
                public final long timestamp;
        }
 
@@ -935,5 +922,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        public void setAuthDate(String authDate) {
                this.authDate = authDate;
        }
-       
+
 }