Changes to the DMaap Client 55/28955/1
authorsunil.unnava <su622b@att.com>
Tue, 23 Jan 2018 20:26:15 +0000 (15:26 -0500)
committersunil.unnava <su622b@att.com>
Tue, 23 Jan 2018 20:36:05 +0000 (15:36 -0500)
Added new API to the DMaapClient

Issue-ID: DMAAP-214
Change-Id: I4de2da7ca42ad1b5925a2df9d26672875dd15b10
Signed-off-by: sunil.unnava <su622b@att.com>
pom.xml
src/main/java/com/att/nsa/mr/client/MRClientFactory.java
src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
version.properties

diff --git a/pom.xml b/pom.xml
index d893c94..c8327cc 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
        <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
        <artifactId>dmaapClient</artifactId>
        <packaging>jar</packaging>
-       <version>1.1.0-SNAPSHOT</version>
+       <version>1.1.1-SNAPSHOT</version>
        <name>dmaap-messagerouter-dmaapclient</name>
        <description>Client library for MR event routing API</description>
        <url>https://github.com/att/dmaap-framework</url>
index 59e472c..b654282 100644 (file)
@@ -43,158 +43,205 @@ import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 /**
  * A factory for MR clients.<br/>
  * <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
- * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
- * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
- * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
- * them. Be sure to use a different ID for each instance.)<br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each
+ * instance.)<br/>
  * <br/>
- * Publishers  
+ * Publishers
  * 
  * @author author
  */
-public class MRClientFactory
-{
+public class MRClientFactory {
        public static MultivaluedMap<String, Object> HTTPHeadersMap;
        public static Map<String, String> DME2HeadersMap;
        public static String routeFilePath;
-       
+
        public static FileReader routeReader;
-       
-       public static FileWriter routeWriter= null;
-       public static Properties prop=null;
-       //routeReader= new FileReader(new File (routeFilePath));
-       //props= new Properties();
+
+       public static FileWriter routeWriter = null;
+       public static Properties prop = null;
+
+       // routeReader= new FileReader(new File (routeFilePath));
+       // props= new Properties();
        /**
-        * Create a consumer instance with the default timeout and no limit
-        * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
-        * across sessions.
+        * Create a consumer instance with the default timeout and no limit on
+        * messages returned. This consumer operates as an independent consumer
+        * (i.e., not in a group) and is NOT re-startable across sessions.
         * 
-        * @param hostList A comma separated list of hosts to use to connect to MR.
-        * You can include port numbers (3904 is the default). For example, "hostname:8080,"
+        * @param hostList
+        *            A comma separated list of hosts to use to connect to MR. You
+        *            can include port numbers (3904 is the default). For example,
+        *            "hostname:8080,"
         * 
-        * @param topic The topic to consume
+        * @param topic
+        *            The topic to consume
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( String hostList, String topic )
-       {
-               return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
+       public static MRConsumer createConsumer(String hostList, String topic) {
+               return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
        }
 
        /**
-        * Create a consumer instance with the default timeout and no limit
-        * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
-        * across sessions.
+        * Create a consumer instance with the default timeout and no limit on
+        * messages returned. This consumer operates as an independent consumer
+        * (i.e., not in a group) and is NOT re-startable across sessions.
         * 
-        * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-        * @param topic The topic to consume
+        * @param hostSet
+        *            The host used in the URL to MR. Entries can be "host:port".
+        * @param topic
+        *            The topic to consume
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
-       {
-               return createConsumer ( hostSet, topic, null );
+       public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+               return createConsumer(hostSet, topic, null);
        }
 
        /**
-        * Create a consumer instance with server-side filtering, the default timeout, and no limit
-        * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+        * Create a consumer instance with server-side filtering, the default
+        * timeout, and no limit on messages returned. This consumer operates as an
+        * independent consumer (i.e., not in a group) and is NOT re-startable
         * across sessions.
         * 
-        * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-        * @param topic The topic to consume
-        * @param filter a filter to use on the server side
+        * @param hostSet
+        *            The host used in the URL to MR. Entries can be "host:port".
+        * @param topic
+        *            The topic to consume
+        * @param filter
+        *            a filter to use on the server side
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
-       {
-               return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
+       public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+               return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
        }
 
        /**
-        * Create a consumer instance with the default timeout, and no limit
-        * on messages returned. This consumer can operate in a logical group and is re-startable
-        * across sessions when you use the same group and ID on restart.
+        * Create a consumer instance with the default timeout, and no limit on
+        * messages returned. This consumer can operate in a logical group and is
+        * re-startable across sessions when you use the same group and ID on
+        * restart.
         * 
-        * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-        * @param topic The topic to consume
-        * @param consumerGroup The name of the consumer group this consumer is part of
-        * @param consumerId The unique id of this consume in its group
+        * @param hostSet
+        *            The host used in the URL to MR. Entries can be "host:port".
+        * @param topic
+        *            The topic to consume
+        * @param consumerGroup
+        *            The name of the consumer group this consumer is part of
+        * @param consumerId
+        *            The unique id of this consume in its group
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
-       {
-               return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
+       public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+                       final String consumerId) {
+               return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
        }
 
        /**
-        * Create a consumer instance with the default timeout, and no limit
-        * on messages returned. This consumer can operate in a logical group and is re-startable
-        * across sessions when you use the same group and ID on restart.
+        * Create a consumer instance with the default timeout, and no limit on
+        * messages returned. This consumer can operate in a logical group and is
+        * re-startable across sessions when you use the same group and ID on
+        * restart.
         * 
-        * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-        * @param topic The topic to consume
-        * @param consumerGroup The name of the consumer group this consumer is part of
-        * @param consumerId The unique id of this consume in its group
-        * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-        * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+        * @param hostSet
+        *            The host used in the URL to MR. Entries can be "host:port".
+        * @param topic
+        *            The topic to consume
+        * @param consumerGroup
+        *            The name of the consumer group this consumer is part of
+        * @param consumerId
+        *            The unique id of this consume in its group
+        * @param timeoutMs
+        *            The amount of time in milliseconds that the server should keep
+        *            the connection open while waiting for message traffic. Use -1
+        *            for default timeout.
+        * @param limit
+        *            A limit on the number of messages returned in a single call.
+        *            Use -1 for no limit.
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
-       {
-               return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
+       public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+                       final String consumerId, int timeoutMs, int limit) {
+               return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
        }
 
        /**
-        * Create a consumer instance with the default timeout, and no limit
-        * on messages returned. This consumer can operate in a logical group and is re-startable
-        * across sessions when you use the same group and ID on restart. This consumer also uses
-        * server-side filtering.
-        * 
-        * @param hostList A comma separated list of hosts to use to connect to MR.
-        * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
-        * @param topic The topic to consume
-        * @param consumerGroup The name of the consumer group this consumer is part of
-        * @param consumerId The unique id of this consume in its group
-        * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-        * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
-        * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+        * Create a consumer instance with the default timeout, and no limit on
+        * messages returned. This consumer can operate in a logical group and is
+        * re-startable across sessions when you use the same group and ID on
+        * restart. This consumer also uses server-side filtering.
+        * 
+        * @param hostList
+        *            A comma separated list of hosts to use to connect to MR. You
+        *            can include port numbers (3904 is the default). For example,
+        *            "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+        * @param topic
+        *            The topic to consume
+        * @param consumerGroup
+        *            The name of the consumer group this consumer is part of
+        * @param consumerId
+        *            The unique id of this consume in its group
+        * @param timeoutMs
+        *            The amount of time in milliseconds that the server should keep
+        *            the connection open while waiting for message traffic. Use -1
+        *            for default timeout.
+        * @param limit
+        *            A limit on the number of messages returned in a single call.
+        *            Use -1 for no limit.
+        * @param filter
+        *            A Highland Park filter expression using only built-in filter
+        *            components. Use null for "no filter".
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
-               final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
-       {
-               return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
-                       consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+       public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+               return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+                               filter, apiKey, apiSecret);
        }
 
        /**
-        * Create a consumer instance with the default timeout, and no limit
-        * on messages returned. This consumer can operate in a logical group and is re-startable
-        * across sessions when you use the same group and ID on restart. This consumer also uses
-        * server-side filtering.
-        * 
-        * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-        * @param topic The topic to consume
-        * @param consumerGroup The name of the consumer group this consumer is part of
-        * @param consumerId The unique id of this consume in its group
-        * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-        * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
-        * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+        * Create a consumer instance with the default timeout, and no limit on
+        * messages returned. This consumer can operate in a logical group and is
+        * re-startable across sessions when you use the same group and ID on
+        * restart. This consumer also uses server-side filtering.
+        * 
+        * @param hostSet
+        *            The host used in the URL to MR. Entries can be "host:port".
+        * @param topic
+        *            The topic to consume
+        * @param consumerGroup
+        *            The name of the consumer group this consumer is part of
+        * @param consumerId
+        *            The unique id of this consume in its group
+        * @param timeoutMs
+        *            The amount of time in milliseconds that the server should keep
+        *            the connection open while waiting for message traffic. Use -1
+        *            for default timeout.
+        * @param limit
+        *            A limit on the number of messages returned in a single call.
+        *            Use -1 for no limit.
+        * @param filter
+        *            A Highland Park filter expression using only built-in filter
+        *            components. Use null for "no filter".
         * 
         * @return a consumer
         */
-       public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
-               final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
-       {
-               if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
+       public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+               if (MRClientBuilders.sfConsumerMock != null)
+                       return MRClientBuilders.sfConsumerMock;
                try {
-                       return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+                       return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+                                       apiSecret);
                } catch (MalformedURLException e) {
                        throw new RuntimeException(e);
                }
@@ -203,282 +250,339 @@ public class MRClientFactory
        /*************************************************************************/
        /*************************************************************************/
        /*************************************************************************/
-       
+
        /**
-        * Create a publisher that sends each message (or group of messages) immediately. Most
-        * applications should favor higher latency for much higher message throughput and the
-        * "simple publisher" is not a good choice. 
-        *  
-        * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-        * @param topic The topic on which to publish messages.
+        * Create a publisher that sends each message (or group of messages)
+        * immediately. Most applications should favor higher latency for much
+        * higher message throughput and the "simple publisher" is not a good
+        * choice.
+        * 
+        * @param hostlist
+        *            The host used in the URL to MR. Can be "host:port", can be
+        *            multiple comma-separated entries.
+        * @param topic
+        *            The topic on which to publish messages.
         * @return a publisher
         */
-       public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
-       {
-               return createBatchingPublisher ( hostlist, topic, 1, 1 );
+       public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+               return createBatchingPublisher(hostlist, topic, 1, 1);
        }
 
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown. Message payloads are
+        * not compressed.
         * 
-        * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-        * @param topic The topic on which to publish messages.
-        * @param maxBatchSize The largest set of messages to batch
-        * @param maxAgeMs The maximum age of a message waiting in a batch
+        * @param hostlist
+        *            The host used in the URL to MR. Can be "host:port", can be
+        *            multiple comma-separated entries.
+        * @param topic
+        *            The topic on which to publish messages.
+        * @param maxBatchSize
+        *            The largest set of messages to batch
+        * @param maxAgeMs
+        *            The maximum age of a message waiting in a batch
         * 
         * @return a publisher
         */
-       public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
-       {
-               return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
+       public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+                       long maxAgeMs) {
+               return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
        }
 
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown. 
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown.
         * 
-        * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-        * @param topic The topic on which to publish messages.
-        * @param maxBatchSize The largest set of messages to batch
-        * @param maxAgeMs The maximum age of a message waiting in a batch
-        * @param compress use gzip compression
+        * @param hostlist
+        *            The host used in the URL to MR. Can be "host:port", can be
+        *            multiple comma-separated entries.
+        * @param topic
+        *            The topic on which to publish messages.
+        * @param maxBatchSize
+        *            The largest set of messages to batch
+        * @param maxAgeMs
+        *            The maximum age of a message waiting in a batch
+        * @param compress
+        *            use gzip compression
         * 
         * @return a publisher
         */
-       public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-       {
-               return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
+       public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+                       long maxAgeMs, boolean compress) {
+               return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
        }
 
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown. 
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown.
         * 
-        * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-        * @param topic The topic on which to publish messages.
-        * @param maxBatchSize The largest set of messages to batch
-        * @param maxAgeMs The maximum age of a message waiting in a batch
-        * @param compress use gzip compression
+        * @param hostSet
+        *            A set of hosts to be used in the URL to MR. Can be
+        *            "host:port". Use multiple entries to enable failover.
+        * @param topic
+        *            The topic on which to publish messages.
+        * @param maxBatchSize
+        *            The largest set of messages to batch
+        * @param maxAgeMs
+        *            The maximum age of a message waiting in a batch
+        * @param compress
+        *            use gzip compression
         * 
         * @return a publisher
         */
-       public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-       {
-               final TreeSet<String> hosts = new TreeSet<String> ();
-               for ( String hp : hostSet )
-               {
-                       hosts.add ( hp );
+       public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+                       long maxAgeMs, boolean compress) {
+               final TreeSet<String> hosts = new TreeSet<String>();
+               for (String hp : hostSet) {
+                       hosts.add(hp);
                }
-               return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
+               return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
        }
 
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown. 
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown.
         * 
-        * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-        * @param topic The topic on which to publish messages.
-        * @param maxBatchSize The largest set of messages to batch
-        * @param maxAgeMs The maximum age of a message waiting in a batch
-        * @param compress use gzip compression
+        * @param hostSet
+        *            A set of hosts to be used in the URL to MR. Can be
+        *            "host:port". Use multiple entries to enable failover.
+        * @param topic
+        *            The topic on which to publish messages.
+        * @param maxBatchSize
+        *            The largest set of messages to batch
+        * @param maxAgeMs
+        *            The maximum age of a message waiting in a batch
+        * @param compress
+        *            use gzip compression
         * 
         * @return a publisher
         */
-       public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-       {
-               return new MRSimplerBatchPublisher.Builder ().
-                       againstUrls ( hostSet ).
-                       onTopic ( topic ).
-                       batchTo ( maxBatchSize, maxAgeMs ).
-                       compress ( compress ).
-                       build ();
+       public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+                       int maxBatchSize, long maxAgeMs, boolean compress) {
+               return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+                               .compress(compress).build();
        }
-       
+
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown. 
-        * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-        * @param topic The topic on which to publish messages.
-        * @param username username 
-        * @param password password
-        * @param maxBatchSize The largest set of messages to batch
-        * @param maxAgeMs The maximum age of a message waiting in a batch
-        * @param compress use gzip compression
-        * @param protocolFlag  http auth or ueb auth or dme2 method
-        * @param producerFilePath all properties for publisher
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown.
+        * 
+        * @param host
+        *            A host to be used in the URL to MR. Can be "host:port". Use
+        *            multiple entries to enable failover.
+        * @param topic
+        *            The topic on which to publish messages.
+        * @param username
+        *            username
+        * @param password
+        *            password
+        * @param maxBatchSize
+        *            The largest set of messages to batch
+        * @param maxAgeMs
+        *            The maximum age of a message waiting in a batch
+        * @param compress
+        *            use gzip compression
+        * @param protocolFlag
+        *            http auth or ueb auth or dme2 method
+        * @param producerFilePath
+        *            all properties for publisher
         * @return MRBatchingPublisher obj
         */
-       public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
-       {
-               MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-                       againstUrls(MRConsumerImpl.stringToList(host)).
-                       onTopic ( topic ).
-                       batchTo ( maxBatchSize, maxAgeMs ).
-                       compress ( compress ).
-                       build ();
-               
+       public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+                       final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
+                       String producerFilePath) {
+               MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+                               .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+                               .compress(compress).build();
+
                pub.setHost(host);
                pub.setUsername(username);
                pub.setPassword(password);
                pub.setProtocolFlag(protocolFlag);
-               pub.setProducerFilePath(producerFilePath);
                return pub;
        }
-       
-       
+
        /**
-        * Create a publisher that batches messages. Be sure to close the publisher to
-        * send the last batch and ensure a clean shutdown
-        * @param producerFilePath set all properties for publishing message
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown
+        * 
+        * @param Properties
+        *            props set all properties for publishing message
         * @return MRBatchingPublisher obj
-        * @throws FileNotFoundException exc
-        * @throws IOException ioex
+        * @throws FileNotFoundException
+        *             exc
+        * @throws IOException
+        *             ioex
         */
-       public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException    {
-               FileReader reader = new FileReader(new File (producerFilePath));
-               Properties props = new Properties();            
+       public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+                       throws FileNotFoundException, IOException {
+               return createInternalBatchingPublisher(props, withResponse);
+       }
+
+       /**
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown
+        * 
+        * @param Properties
+        *            props set all properties for publishing message
+        * @return MRBatchingPublisher obj
+        * @throws FileNotFoundException
+        *             exc
+        * @throws IOException
+        *             ioex
+        */
+       public static MRBatchingPublisher createBatchingPublisher(Properties props)
+                       throws FileNotFoundException, IOException {
+               return createInternalBatchingPublisher(props, false);
+       }
+
+       /**
+        * Create a publisher that batches messages. Be sure to close the publisher
+        * to send the last batch and ensure a clean shutdown
+        * 
+        * @param producerFilePath
+        *            set all properties for publishing message
+        * @return MRBatchingPublisher obj
+        * @throws FileNotFoundException
+        *             exc
+        * @throws IOException
+        *             ioex
+        */
+       public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+                       throws FileNotFoundException, IOException {
+               FileReader reader = new FileReader(new File(producerFilePath));
+               Properties props = new Properties();
                props.load(reader);
-               MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-                       againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
-                       onTopic ( props.getProperty("topic") ).
-                       batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
-                       compress (Boolean.parseBoolean(props.getProperty("compress"))).
-                       httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
-                       build ();               
-               pub.setHost(props.getProperty("host"));
-               if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-                               
-                       pub.setAuthKey(props.getProperty("authKey"));
-                       pub.setAuthDate(props.getProperty("authDate"));
-                       pub.setUsername(props.getProperty("username"));
-                       pub.setPassword(props.getProperty("password"));
-               }else{
-                       pub.setUsername(props.getProperty("username"));
-                       pub.setPassword(props.getProperty("password"));
-               }
-               pub.setProducerFilePath(producerFilePath);
-               pub.setProtocolFlag(props.getProperty("TransportType"));
-               pub.setProps(props);
-               routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-               routeReader= new FileReader(new File (routeFilePath));
-               prop= new Properties();
-               File fo= new File(routeFilePath);
-               if(!fo.exists()){
-                       routeWriter=new FileWriter(new File(routeFilePath));
-               }
-               //pub.setContentType(contentType);
-               return pub;
+               return createBatchingPublisher(props);
        }
-       
+
        /**
-        * Create a publisher that will contain send methods that return 
-        * response object to user. 
-        * @param producerFilePath set all properties for publishing message
+        * Create a publisher that will contain send methods that return response
+        * object to user.
+        * 
+        * @param producerFilePath
+        *            set all properties for publishing message
         * @return MRBatchingPublisher obj
-        * @throws FileNotFoundException exc
-        * @throws IOException ioex
+        * @throws FileNotFoundException
+        *             exc
+        * @throws IOException
+        *             ioex
         */
-       public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException      {
-               FileReader reader = new FileReader(new File (producerFilePath));
-               Properties props = new Properties();            
+       public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+                       throws FileNotFoundException, IOException {
+               FileReader reader = new FileReader(new File(producerFilePath));
+               Properties props = new Properties();
                props.load(reader);
-               MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-                       againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
-                       onTopic ( props.getProperty("topic") ).
-                       batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
-                       compress (Boolean.parseBoolean(props.getProperty("compress"))).
-                       httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
-                       withResponse(withResponse).
-                       build ();               
+               return createBatchingPublisher(props, withResponse);
+       }
+
+       protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+                       throws FileNotFoundException, IOException {
+               assert props != null;
+               MRSimplerBatchPublisher pub;
+               if (withResponse) {
+                       pub = new MRSimplerBatchPublisher.Builder()
+                                       .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+                                       .onTopic(props.getProperty("topic"))
+                                       .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+                                                       Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+                                       .compress(Boolean.parseBoolean(props.getProperty("compress")))
+                                       .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+                                       .withResponse(withResponse).build();
+               } else {
+                       pub = new MRSimplerBatchPublisher.Builder()
+                                       .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+                                       .onTopic(props.getProperty("topic"))
+                                       .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+                                                       Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+                                       .compress(Boolean.parseBoolean(props.getProperty("compress")))
+                                       .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+               }
                pub.setHost(props.getProperty("host"));
-               if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-                               
+               if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
                        pub.setAuthKey(props.getProperty("authKey"));
                        pub.setAuthDate(props.getProperty("authDate"));
                        pub.setUsername(props.getProperty("username"));
                        pub.setPassword(props.getProperty("password"));
-               }else{
+               } else {
                        pub.setUsername(props.getProperty("username"));
                        pub.setPassword(props.getProperty("password"));
                }
-               pub.setProducerFilePath(producerFilePath);
                pub.setProtocolFlag(props.getProperty("TransportType"));
                pub.setProps(props);
-               routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-               routeReader= new FileReader(new File (routeFilePath));
-               prop= new Properties();
-               File fo= new File(routeFilePath);
-               if(!fo.exists()){
-                       routeWriter=new FileWriter(new File(routeFilePath));
+               routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+               routeReader = new FileReader(new File(routeFilePath));
+               prop = new Properties();
+               File fo = new File(routeFilePath);
+               if (!fo.exists()) {
+                       routeWriter = new FileWriter(new File(routeFilePath));
                }
-               //pub.setContentType(contentType);
                return pub;
        }
-       
-       
-       
-
-       
-       
-       
-       
-       
 
-       
        /**
         * Create an identity manager client to work with API keys.
-        * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-        * @param apiKey Your API key
-        * @param apiSecret Your API secret
+        * 
+        * @param hostSet
+        *            A set of hosts to be used in the URL to MR. Can be
+        *            "host:port". Use multiple entries to enable failover.
+        * @param apiKey
+        *            Your API key
+        * @param apiSecret
+        *            Your API secret
         * @return an identity manager
         */
-       public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
-       {
+       public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
                MRIdentityManager cim;
                try {
-                       cim = new MRMetaClient ( hostSet );
+                       cim = new MRMetaClient(hostSet);
                } catch (MalformedURLException e) {
                        throw new RuntimeException(e);
                }
-               cim.setApiCredentials ( apiKey, apiSecret );
+               cim.setApiCredentials(apiKey, apiSecret);
                return cim;
        }
 
        /**
         * Create a topic manager for working with topics.
-        * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-        * @param apiKey Your API key
-        * @param apiSecret Your API secret
+        * 
+        * @param hostSet
+        *            A set of hosts to be used in the URL to MR. Can be
+        *            "host:port". Use multiple entries to enable failover.
+        * @param apiKey
+        *            Your API key
+        * @param apiSecret
+        *            Your API secret
         * @return a topic manager
         */
-       public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
-       {
+       public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
                MRMetaClient tmi;
                try {
-                       tmi = new MRMetaClient ( hostSet );
+                       tmi = new MRMetaClient(hostSet);
                } catch (MalformedURLException e) {
                        throw new RuntimeException(e);
                }
-               tmi.setApiCredentials ( apiKey, apiSecret );
+               tmi.setApiCredentials(apiKey, apiSecret);
                return tmi;
        }
 
        /**
         * Inject a consumer. Used to support unit tests.
+        * 
         * @param cc
         */
-       public static void $testInject ( MRConsumer cc )
-       {
+       public static void $testInject(MRConsumer cc) {
                MRClientBuilders.sfConsumerMock = cc;
        }
 
-       public static MRConsumer createConsumer(String host, String topic, String username,
-                       String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
+       public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+                       String id, int i, int j, String protocalFlag, String consumerFilePath) {
 
                MRConsumerImpl sub;
                try {
-                       sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+                       sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
                } catch (MalformedURLException e) {
                        throw new RuntimeException(e);
                }
@@ -488,15 +592,15 @@ public class MRClientFactory
                sub.setProtocolFlag(protocalFlag);
                sub.setConsumerFilePath(consumerFilePath);
                return sub;
-       
+
        }
-       
-       public static MRConsumer createConsumer(String host, String topic, String username,
-                       String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
+
+       public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+                       String id, String protocalFlag, String consumerFilePath, int i, int j) {
 
                MRConsumerImpl sub;
                try {
-                       sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+                       sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
                } catch (MalformedURLException e) {
                        throw new RuntimeException(e);
                }
@@ -506,52 +610,61 @@ public class MRClientFactory
                sub.setProtocolFlag(protocalFlag);
                sub.setConsumerFilePath(consumerFilePath);
                return sub;
-       
+
        }
 
-       public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
-               FileReader reader = new FileReader(new File (consumerFilePath));
-               Properties props = new Properties();            
+       public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+               FileReader reader = new FileReader(new File(consumerFilePath));
+               Properties props = new Properties();
                props.load(reader);
+
+               return createConsumer(props);
+       }
+
+       public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
                int timeout;
-               if(props.getProperty("timeout")!=null)
-                       timeout=Integer.parseInt(props.getProperty("timeout"));
+               if (props.getProperty("timeout") != null)
+                       timeout = Integer.parseInt(props.getProperty("timeout"));
                else
-                       timeout=-1;
+                       timeout = -1;
                int limit;
-               if(props.getProperty("limit")!=null)
-                       limit=Integer.parseInt(props.getProperty("limit"));
+               if (props.getProperty("limit") != null)
+                       limit = Integer.parseInt(props.getProperty("limit"));
                else
-                       limit=-1;
+                       limit = -1;
                String group;
-               if(props.getProperty("group")==null)
-               group=UUID.randomUUID ().toString();
+               if (props.getProperty("group") == null)
+                       group = UUID.randomUUID().toString();
                else
-                       group=props.getProperty("group");
-               MRConsumerImpl sub=null;
-               if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-                       sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout,  limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate")  );
+                       group = props.getProperty("group");
+               MRConsumerImpl sub = null;
+               if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+                       sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+                                       group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+                                       props.getProperty("authKey"), props.getProperty("authDate"));
                        sub.setAuthKey(props.getProperty("authKey"));
                        sub.setAuthDate(props.getProperty("authDate"));
                        sub.setUsername(props.getProperty("username"));
                        sub.setPassword(props.getProperty("password"));
-               }else{
-                       sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password")  );
+               } else {
+                       sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+                                       group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+                                       props.getProperty("username"), props.getProperty("password"));
                        sub.setUsername(props.getProperty("username"));
                        sub.setPassword(props.getProperty("password"));
                }
                sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
-           sub.setProps(props);
+               sub.setProps(props);
                sub.setHost(props.getProperty("host"));
                sub.setProtocolFlag(props.getProperty("TransportType"));
-               //sub.setConsumerFilePath(consumerFilePath);
+               // sub.setConsumerFilePath(consumerFilePath);
                sub.setfFilter(props.getProperty("filter"));
-               routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-               routeReader= new FileReader(new File (routeFilePath));
-               prop= new Properties();
-               File fo= new File(routeFilePath);
-               if(!fo.exists()){
-                               routeWriter=new FileWriter(new File(routeFilePath));
+               routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+               routeReader = new FileReader(new File(routeFilePath));
+               prop = new Properties();
+               File fo = new File(routeFilePath);
+               if (!fo.exists()) {
+                       routeWriter = new FileWriter(new File(routeFilePath));
                }
                return sub;
        }
index 012e95e..999d7ef 100644 (file)
@@ -50,286 +50,318 @@ import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 //import com.fasterxml.jackson.core.JsonProcessingException;
 
-public class MRBaseClient extends HttpClient implements MRClient 
-{
-       
+public class MRBaseClient extends HttpClient implements MRClient {
+
        private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
        private static final String MR_DATE_CONSTANT = "X-CambriaDate";
-       
-       protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException
-       {
-               super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort );
-               
-               fLog = LoggerFactory.getLogger ( this.getClass().getName () );
-       }
 
-       protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException {
-               super ( ConnectionType.HTTP,hosts, stdSvcPort);
+       protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
+               super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
 
-               fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+               fLog = LoggerFactory.getLogger(this.getClass().getName());
        }
 
-       protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
-       {
-               super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
+       protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
+               super(ConnectionType.HTTP, hosts, stdSvcPort);
 
-               fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+               fLog = LoggerFactory.getLogger(this.getClass().getName());
        }
 
+       protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
+               super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
+                               TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+               fLog = LoggerFactory.getLogger(this.getClass().getName());
+       }
 
        @Override
-       public void close ()
-       {
+       public void close() {
        }
 
-       protected Set<String> jsonArrayToSet ( JSONArray a )
-       {
-               if ( a == null ) return null;
+       protected Set<String> jsonArrayToSet(JSONArray a) {
+               if (a == null)
+                       return null;
 
-               final TreeSet<String> set = new TreeSet<String> ();
-               for ( int i=0; i<a.length (); i++ )
-               {
-                       set.add ( a.getString ( i ));
+               final TreeSet<String> set = new TreeSet<String>();
+               for (int i = 0; i < a.length(); i++) {
+                       set.add(a.getString(i));
                }
                return set;
        }
 
-       public void logTo ( Logger log )
-       {
+       public void logTo(Logger log) {
                fLog = log;
-               replaceLogger ( log );
+               replaceLogger(log);
        }
 
-       protected Logger getLog ()
-       {
+       protected Logger getLog() {
                return fLog;
        }
 
        private Logger fLog;
-       
-       public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+       public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
+                       final String password, final String protocolFlag) throws HttpException, JSONException {
                if ((null != username && null != password)) {
                        WebTarget target = null;
 
                        Response response = null;
-                       
+
                        target = getTarget(path, username, password);
-                       String encoding = Base64.encodeAsString(username+":"+password);
-                       
-                       
-                       response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-                       
+                       String encoding = Base64.encodeAsString(username + ":" + password);
+
+                       response = target.request().header("Authorization", "Basic " + encoding)
+                                       .post(Entity.entity(data, contentType));
+
                        return getResponseDataInJson(response);
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
                }
        }
-       public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+       public String postWithResponse(final String path, final byte[] data, final String contentType,
+                       final String username, final String password, final String protocolFlag)
+                       throws HttpException, JSONException {
                String responseData = null;
                if ((null != username && null != password)) {
                        WebTarget target = null;
 
                        Response response = null;
-                       
+
                        target = getTarget(path, username, password);
-                       String encoding = Base64.encodeAsString(username+":"+password);
-                       
-                       
-                       response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-                       
+                       String encoding = Base64.encodeAsString(username + ":" + password);
+
+                       response = target.request().header("Authorization", "Basic " + encoding)
+                                       .post(Entity.entity(data, contentType));
+
                        responseData = response.readEntity(String.class);
                        return responseData;
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
                }
        }
-       public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+       public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,
+                       final String authDate, final String username, final String password, final String protocolFlag)
+                       throws HttpException, JSONException {
                if ((null != username && null != password)) {
                        WebTarget target = null;
 
                        Response response = null;
-                               target= getTarget(path,username, password);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, authKey)
-                                               .header(MR_DATE_CONSTANT, authDate)
-                                               .post(Entity.entity(data, contentType));
-                               
+                       target = getTarget(path, username, password);
+                       response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+                                       .post(Entity.entity(data, contentType));
+
                        return getResponseDataInJson(response);
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
                }
        }
-       public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+       public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
+                       final String authKey, final String authDate, final String username, final String password,
+                       final String protocolFlag) throws HttpException, JSONException {
                String responseData = null;
                if ((null != username && null != password)) {
                        WebTarget target = null;
 
                        Response response = null;
-                               target= getTarget(path,username, password);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, authKey)
-                                               .header(MR_DATE_CONSTANT, authDate)
-                                               .post(Entity.entity(data, contentType));
-                               responseData = response.readEntity(String.class);
-                               return responseData;
-                       
+                       target = getTarget(path, username, password);
+                       response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+                                       .post(Entity.entity(data, contentType));
+                       responseData = response.readEntity(String.class);
+                       return responseData;
+
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
                }
        }
 
-
-       public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+       public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
+                       throws HttpException, JSONException {
                if (null != username && null != password) {
-                       
+
                        WebTarget target = null;
 
                        Response response = null;
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               target=getTarget(path);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, username)
-                                               .header(MR_DATE_CONSTANT, password)
-                                               .get();
+                               target = getTarget(path);
+                               response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
                        } else {
                                target = getTarget(path, username, password);
-                               String encoding = Base64.encodeAsString(username+":"+password);
-                               
-                               response = target.request().header("Authorization", "Basic " + encoding).get(); 
-                                               
+                               String encoding = Base64.encodeAsString(username + ":" + password);
+
+                               response = target.request().header("Authorization", "Basic " + encoding).get();
+
                        }
                        return getResponseDataInJson(response);
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
                }
        }
-       
-       
-       public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+       public String getResponse(final String path, final String username, final String password,
+                       final String protocolFlag) throws HttpException, JSONException {
                String responseData = null;
                if (null != username && null != password) {
-                       
+
                        WebTarget target = null;
 
                        Response response = null;
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               target=getTarget(path);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, username)
-                                               .header(MR_DATE_CONSTANT, password)
-                                               .get();
+                               target = getTarget(path);
+                               response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
                        } else {
                                target = getTarget(path, username, password);
-                               String encoding = Base64.encodeAsString(username+":"+password);                         
-                               response = target.request().header("Authorization", "Basic " + encoding).get();                                                 
+                               String encoding = Base64.encodeAsString(username + ":" + password);
+                               response = target.request().header("Authorization", "Basic " + encoding).get();
                        }
-                       MRClientFactory.HTTPHeadersMap=response.getHeaders();
-               
-                       String transactionid=response.getHeaderString("transactionid");
-                               if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
-                                       fLog.info("TransactionId : " + transactionid);
+                       MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+                       String transactionid = response.getHeaderString("transactionid");
+                       if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+                               fLog.info("TransactionId : " + transactionid);
                        }
-                       
+
                        responseData = response.readEntity(String.class);
                        return responseData;
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
                }
        }
-       
-       public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+       public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
+                       final String password, final String protocolFlag) throws HttpException, JSONException {
                if (null != username && null != password) {
-                       
+
                        WebTarget target = null;
 
                        Response response = null;
-                               target=getTarget(path, username, password);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, authKey)
-                                               .header(MR_DATE_CONSTANT, authDate)
-                                               .get();
-                                               
+                       target = getTarget(path, username, password);
+                       response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
                        return getResponseDataInJson(response);
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
                }
        }
-       
-       public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+       public JSONObject getNoAuth(final String path, final String username, final String password,
+                       final String protocolFlag) throws HttpException, JSONException {
+               if (null != username && null != password) {
+
+                       WebTarget target = null;
+
+                       Response response = null;
+                       target = getTarget(path, username, password);
+                       response = target.request().get();
+
+                       return getResponseDataInJson(response);
+               } else {
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+               }
+       }
+
+       public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
+                       final String password, final String protocolFlag) throws HttpException, JSONException {
                String responseData = null;
                if (null != username && null != password) {
-                       
+
                        WebTarget target = null;
 
                        Response response = null;
-                               target=getTarget(path, username, password);
-                               response = target.request()
-                                               .header(MR_AUTH_CONSTANT, authKey)
-                                               .header(MR_DATE_CONSTANT, authDate)
-                                               .get();
-                               
-                               MRClientFactory.HTTPHeadersMap=response.getHeaders();
-                               
-                               String transactionid=response.getHeaderString("transactionid");
-                                       if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
-                                               fLog.info("TransactionId : " + transactionid);
-                               }
-                                               
-                               responseData = response.readEntity(String.class);
-                               return responseData;
+                       target = getTarget(path, username, password);
+                       response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
+                       MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+                       String transactionid = response.getHeaderString("transactionid");
+                       if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+                               fLog.info("TransactionId : " + transactionid);
+                       }
+
+                       responseData = response.readEntity(String.class);
+                       return responseData;
                } else {
-                       throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+                       throw new HttpException(
+                                       "Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+               }
+       }
+
+       public String getNoAuthResponse(String path, final String username, final String password,
+                       final String protocolFlag) throws HttpException, JSONException {
+               String responseData = null;
+
+               WebTarget target = null;
+
+               Response response = null;
+               target = getTarget(path, username, password);
+               response = target.request().get();
+
+               MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+               String transactionid = response.getHeaderString("transactionid");
+               if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+                       fLog.info("TransactionId : " + transactionid);
                }
+
+               responseData = response.readEntity(String.class);
+               return responseData;
+
        }
 
        private WebTarget getTarget(final String path, final String username, final String password) {
 
                Client client = ClientBuilder.newClient();
 
-               
-                       // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types.
-                       HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
-                       client.register(feature);
-               
+               // Using UNIVERSAL as it supports both BASIC and DIGEST authentication
+               // types.
+               HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+               client.register(feature);
+
                return client.target(path);
        }
 
-
        private WebTarget getTarget(final String path) {
 
                Client client = ClientBuilder.newClient();
                return client.target(path);
        }
+
        private JSONObject getResponseDataInJson(Response response) throws JSONException {
                try {
-                       MRClientFactory.HTTPHeadersMap=response.getHeaders();
-               //      fLog.info("DMAAP response status: " + response.getStatus());
-                       
-                                               
-                       //MultivaluedMap<String, Object> headersMap = response.getHeaders();
-                       //for(String key : headersMap.keySet()) {
-                       String transactionid=response.getHeaderString("transactionid");
-                       if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+                       MRClientFactory.HTTPHeadersMap = response.getHeaders();
+                       // fLog.info("DMAAP response status: " + response.getStatus());
+
+                       // MultivaluedMap<String, Object> headersMap =
+                       // response.getHeaders();
+                       // for(String key : headersMap.keySet()) {
+                       String transactionid = response.getHeaderString("transactionid");
+                       if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
                                fLog.info("TransactionId : " + transactionid);
                        }
 
-                       /*final String responseData = response.readEntity(String.class);
-                       JSONTokener jsonTokener = new JSONTokener(responseData);
-                       JSONObject jsonObject = null;
-                       final char firstChar = jsonTokener.next();
-                       jsonTokener.back();
-                       if ('[' == firstChar) {
-                               JSONArray jsonArray = new JSONArray(jsonTokener);
-                               jsonObject = new JSONObject();
-                               jsonObject.put("result", jsonArray);
-                       } else {
-                               jsonObject = new JSONObject(jsonTokener);
-                       }
-
-                       return jsonObject;*/
-                       
-
-                       if(response.getStatus()==403) {
+                       /*
+                        * final String responseData = response.readEntity(String.class);
+                        * JSONTokener jsonTokener = new JSONTokener(responseData);
+                        * JSONObject jsonObject = null; final char firstChar =
+                        * jsonTokener.next(); jsonTokener.back(); if ('[' == firstChar) {
+                        * JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject =
+                        * new JSONObject(); jsonObject.put("result", jsonArray); } else {
+                        * jsonObject = new JSONObject(jsonTokener); }
+                        * 
+                        * return jsonObject;
+                        */
+
+                       if (response.getStatus() == 403) {
                                JSONObject jsonObject = null;
                                jsonObject = new JSONObject();
                                JSONArray jsonArray = new JSONArray();
@@ -339,11 +371,11 @@ public class MRBaseClient extends HttpClient implements MRClient
                                return jsonObject;
                        }
                        String responseData = response.readEntity(String.class);
-                               
+
                        JSONTokener jsonTokener = new JSONTokener(responseData);
                        JSONObject jsonObject = null;
                        final char firstChar = jsonTokener.next();
-               jsonTokener.back();
+                       jsonTokener.back();
                        if ('[' == firstChar) {
                                JSONArray jsonArray = new JSONArray(jsonTokener);
                                jsonObject = new JSONObject();
@@ -361,35 +393,35 @@ public class MRBaseClient extends HttpClient implements MRClient
                }
 
        }
-       
-       public String getHTTPErrorResponseMessage(String responseString){
-               
+
+       public String getHTTPErrorResponseMessage(String responseString) {
+
                String response = null;
                int beginIndex = 0;
                int endIndex = 0;
-               if(responseString.contains("<body>")){
-                       
-                       beginIndex = responseString.indexOf("body>")+5;
+               if (responseString.contains("<body>")) {
+
+                       beginIndex = responseString.indexOf("body>") + 5;
                        endIndex = responseString.indexOf("</body");
-                       response = responseString.substring(beginIndex,endIndex);
+                       response = responseString.substring(beginIndex, endIndex);
                }
-               
+
                return response;
-               
+
        }
-       
-       public String getHTTPErrorResponseCode(String responseString){
-               
+
+       public String getHTTPErrorResponseCode(String responseString) {
+
                String response = null;
                int beginIndex = 0;
                int endIndex = 0;
-               if(responseString.contains("<title>")){
-                       beginIndex = responseString.indexOf("title>")+6;
+               if (responseString.contains("<title>")) {
+                       beginIndex = responseString.indexOf("title>") + 6;
                        endIndex = responseString.indexOf("</title");
-                       response = responseString.substring(beginIndex,endIndex);
+                       response = responseString.substring(beginIndex, endIndex);
                }
-                               
-               return response;                
+
+               return response;
        }
-       
+
 }
index eb7fd91..78f37fc 100644 (file)
@@ -46,46 +46,43 @@ import org.slf4j.LoggerFactory;
 
 import com.att.aft.dme2.api.DME2Client;
 import com.att.aft.dme2.api.DME2Exception;
+import com.att.nsa.mr.client.HostSelector;
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
 import com.att.nsa.mr.client.response.MRConsumerResponse;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
-public class MRConsumerImpl extends MRBaseClient implements MRConsumer
-{
-       
+public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
+
        private static final String SUCCESS_MESSAGE = "Success";
-       
-       
-       private Logger log = LoggerFactory.getLogger ( this.getClass().getName () );
-       public static List<String> stringToList ( String str )
-       {
-               final LinkedList<String> set = new LinkedList<String> ();
-               if ( str != null )
-               {
-                       final String[] parts = str.trim ().split ( "," );
-                       for ( String part : parts )
-                       {
+
+       private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+       public static List<String> stringToList(String str) {
+               final LinkedList<String> set = new LinkedList<String>();
+               if (str != null) {
+                       final String[] parts = str.trim().split(",");
+                       for (String part : parts) {
                                final String trimmed = part.trim();
-                               if ( trimmed.length () > 0 )
-                               {
-                                       set.add ( trimmed );
+                               if (trimmed.length() > 0) {
+                                       set.add(trimmed);
                                }
                        }
                }
                return set;
        }
-       
-       public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
-                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
-               {
-                       this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
-               }
-       
-       public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
-               final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
-       {
-               super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
+
+       public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+                       String apiSecret_password) throws MalformedURLException {
+               this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+                               false);
+       }
+
+       public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+                       boolean allowSelfSignedCerts) throws MalformedURLException {
+               super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
 
                fTopic = topic;
                fGroup = consumerGroup;
@@ -94,233 +91,243 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                fLimit = limit;
                fFilter = filter;
 
-               //setApiCredentials ( apiKey, apiSecret );
+               fHostSelector = new HostSelector(hostPart);
        }
 
        @Override
-       public Iterable<String> fetch () throws IOException,Exception
-       {
+       public Iterable<String> fetch() throws IOException, Exception {
                // fetch with the timeout and limit set in constructor
-               return fetch ( fTimeoutMs, fLimit );
+               return fetch(fTimeoutMs, fLimit);
        }
 
        @Override
-       public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
-       {
-               final LinkedList<String> msgs = new LinkedList<String> ();
-
-// FIXME: the timeout on the socket needs to be at least as long as the long poll
-//             // sanity check for long poll timeout vs. socket read timeout
-//             final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
-//             if ( timeoutMs > maxReasonableTimeoutMs )
-//             {
-//                     log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
-//                             CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
-//                     timeoutMs = maxReasonableTimeoutMs;
-//             }
-
-       //      final String urlPath = createUrlPath ( timeoutMs, limit );
-
-               //getLog().info ( "UEB GET " + urlPath );
-               try
-               {
+       public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
+               final LinkedList<String> msgs = new LinkedList<String>();
+
+               // FIXME: the timeout on the socket needs to be at least as long as the
+               // long poll
+               // // sanity check for long poll timeout vs. socket read timeout
+               // final int maxReasonableTimeoutMs =
+               // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
+               // if ( timeoutMs > maxReasonableTimeoutMs )
+               // {
+               // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
+               // socket read timeout (" +
+               // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
+               // timeout to " + maxReasonableTimeoutMs + "." );
+               // timeoutMs = maxReasonableTimeoutMs;
+               // }
+
+               // final String urlPath = createUrlPath ( timeoutMs, limit );
+
+               // getLog().info ( "UEB GET " + urlPath );
+               try {
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
                                DMEConfigure(timeoutMs, limit);
-                       try 
-                       {
-                               //getLog().info ( "Receiving msgs from: " + url+subContextPath );
-                               String reply = sender.sendAndWait(timeoutMs+10000L);                            
-                       //      System.out.println("Message received = "+reply);
-                               final JSONObject o =getResponseDataInJson(reply);
-                               //msgs.add(reply);
-                               if ( o != null )
-                               {
-                                       final JSONArray a = o.getJSONArray ( "result" );
-                               //      final int b = o.getInt("status" );
-                                       //if ( a != null && a.length()>0 )
-                                       if ( a != null)
-                                       {
-                                               for ( int i=0; i<a.length (); i++ )
-                                               {
-                                                       //msgs.add("DMAAP response status: "+Integer.toString(b));
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add ( a.getString(i) );
-                                                       else
-                                                       msgs.add ( a.getJSONObject(i).toString() );
-                                                       
-                                                       
+                               try {
+                                       // getLog().info ( "Receiving msgs from: " +
+                                       // url+subContextPath );
+                                       String reply = sender.sendAndWait(timeoutMs + 10000L);
+                                       final JSONObject o = getResponseDataInJson(reply);
+                                       // msgs.add(reply);
+                                       if (o != null) {
+                                               final JSONArray a = o.getJSONArray("result");
+                                               // final int b = o.getInt("status" );
+                                               // if ( a != null && a.length()>0 )
+                                               if (a != null) {
+                                                       for (int i = 0; i < a.length(); i++) {
+                                                               // msgs.add("DMAAP response status:
+                                                               // "+Integer.toString(b));
+                                                               if (a.get(i) instanceof String)
+                                                                       msgs.add(a.getString(i));
+                                                               else
+                                                                       msgs.add(a.getJSONObject(i).toString());
+
+                                                       }
                                                }
+                                               // else if(a != null && a.length()<1){
+                                               // msgs.add ("[]");
+                                               // }
                                        }
-//                                     else if(a != null && a.length()<1){
-//                                             msgs.add ("[]");                
-//                                             }
-                               }
-                       }       
-                       catch ( JSONException e )
-                               {
+                               } catch (JSONException e) {
                                        // unexpected response
-                                       reportProblemWithResponse ();
-                                    log.error("exception: ", e);
+                                       reportProblemWithResponse();
+                                       log.error("exception: ", e);
+                               } catch (HttpException e) {
+                                       throw new IOException(e);
                                }
-                               catch ( HttpException e )
-                               {
-                                       throw new IOException ( e );
-                               }       
                        }
-                       
+
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
-                       
-                               
-                               try
-                               {
-                                       final JSONObject o = get ( urlPath, username, password, protocolFlag );
-
-                                       if ( o != null )
-                                       {
-                                               final JSONArray a = o.getJSONArray ( "result" );
-                                               final int b = o.getInt("status" );
-                                               //if ( a != null && a.length()>0 )
-                                               if ( a != null)
-                                               {
-                                                       for ( int i=0; i<a.length (); i++ )
-                                                       {
-                                                               msgs.add("DMAAP response status: "+Integer.toString(b));
+                               // final String urlPath = createUrlPath
+                               // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
+                               // fId,props.getProperty("Protocol")), timeoutMs, limit );
+                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                               try {
+                                       final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+                                       if (o != null) {
+                                               final JSONArray a = o.getJSONArray("result");
+                                               final int b = o.getInt("status");
+                                               // if ( a != null && a.length()>0 )
+                                               if (a != null) {
+                                                       for (int i = 0; i < a.length(); i++) {
+                                                               // msgs.add("DMAAP response status:
+                                                               // "+Integer.toString(b));
                                                                if (a.get(i) instanceof String)
-                                                                       msgs.add ( a.getString(i) );
+                                                                       msgs.add(a.getString(i));
                                                                else
-                                                                       msgs.add ( a.getJSONObject(i).toString() );
-                                                               
+                                                                       msgs.add(a.getJSONObject(i).toString());
+
                                                        }
                                                }
-//                                             else if(a != null && a.length()<1)
-//                                                     {
-//                                                             msgs.add ("[]");                
-//                                                     }
+                                               // else if(a != null && a.length()<1)
+                                               // {
+                                               // msgs.add ("[]");
+                                               // }
                                        }
-                               }
-                               catch ( JSONException e )
-                               {
+                               } catch (JSONException e) {
                                        // unexpected response
-                                       reportProblemWithResponse ();
-                                    log.error("exception: ", e);
-                               }
-                               catch ( HttpException e )
-                               {
-                                       throw new IOException ( e );
+                                       reportProblemWithResponse();
+                                       log.error("exception: ", e);
+                               } catch (HttpException e) {
+                                       throw new IOException(e);
                                }
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
-                               
-
-                       try 
-                       {
-                               final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
-                               if ( o != null )
-                               {
-                                       final JSONArray a = o.getJSONArray ( "result" );
-                                       final int b = o.getInt("status" );
-                                       //if ( a != null && a.length()>0)
-                                       if ( a != null)
-                                       {
-                                               for ( int i=0; i<a.length (); i++ )
-                                               {
-                                                       msgs.add("DMAAP response status: "+Integer.toString(b));
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add ( a.getString(i) );
-                                                       else
-                                                       msgs.add ( a.getJSONObject(i).toString() );
-                                                       
+                               final String urlPath = createUrlPath(
+                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+                                               timeoutMs, limit);
+
+                               try {
+                                       final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+                                       if (o != null) {
+                                               final JSONArray a = o.getJSONArray("result");
+                                               final int b = o.getInt("status");
+                                               // if ( a != null && a.length()>0)
+                                               if (a != null) {
+                                                       for (int i = 0; i < a.length(); i++) {
+                                                               // msgs.add("DMAAP response status:
+                                                               // "+Integer.toString(b));
+                                                               if (a.get(i) instanceof String)
+                                                                       msgs.add(a.getString(i));
+                                                               else
+                                                                       msgs.add(a.getJSONObject(i).toString());
+
+                                                       }
                                                }
+                                               // else if(a != null && a.length()<1){
+                                               // msgs.add ("[]");
+                                               // }
                                        }
-//                                     else if(a != null && a.length()<1){
-//                                             msgs.add ("[]");                
-//                                             }
+                               } catch (JSONException e) {
+                                       // unexpected response
+                                       reportProblemWithResponse();
+                                       log.error("exception: ", e);
+                               } catch (HttpException e) {
+                                       throw new IOException(e);
                                }
+
                        }
-                       catch ( JSONException e )
-                       {
-                               // unexpected response
-                               reportProblemWithResponse ();
-                            log.error("exception: ", e);
-                       }
-                       catch ( HttpException e )
-                       {
-                               throw new IOException ( e );
-                       }
-                               
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                               try {
+                                       final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+                                       if (o != null) {
+                                               final JSONArray a = o.getJSONArray("result");
+                                               final int b = o.getInt("status");
+                                               // if ( a != null && a.length()>0)
+                                               if (a != null) {
+                                                       for (int i = 0; i < a.length(); i++) {
+                                                               // msgs.add("DMAAP response status:
+                                                               // "+Integer.toString(b));
+                                                               if (a.get(i) instanceof String)
+                                                                       msgs.add(a.getString(i));
+                                                               else
+                                                                       msgs.add(a.getJSONObject(i).toString());
+
+                                                       }
+                                               }
+
+                                       }
+                               } catch (JSONException e) {
+                                       // unexpected response
+                                       reportProblemWithResponse();
+                               } catch (HttpException e) {
+                                       throw new IOException(e);
+                               }
+
                        }
-                       
-               } catch ( JSONException e ) {
+
+               } catch (JSONException e) {
                        // unexpected response
-                       reportProblemWithResponse ();
-                    log.error("exception: ", e);
+                       reportProblemWithResponse();
+                       log.error("exception: ", e);
                } catch (HttpException e) {
                        throw new IOException(e);
-               } catch (Exception e ) {
+               } catch (Exception e) {
                        throw e;
                }
 
-
                return msgs;
        }
 
        private JSONObject getResponseDataInJson(String response) {
-       try {
-               
-               
-               //log.info("DMAAP response status: " + response.getStatus());
+               try {
+
+                       // log.info("DMAAP response status: " + response.getStatus());
 
-               //      final String responseData = response.readEntity(String.class);
+                       // final String responseData = response.readEntity(String.class);
+                       JSONTokener jsonTokener = new JSONTokener(response);
+                       JSONObject jsonObject = null;
+                       final char firstChar = jsonTokener.next();
+                       jsonTokener.back();
+                       if ('[' == firstChar) {
+                               JSONArray jsonArray = new JSONArray(jsonTokener);
+                               jsonObject = new JSONObject();
+                               jsonObject.put("result", jsonArray);
+                       } else {
+                               jsonObject = new JSONObject(jsonTokener);
+                       }
+
+                       return jsonObject;
+               } catch (JSONException excp) {
+                       // log.error("DMAAP - Error reading response data.", excp);
+                       return null;
+               }
+
+       }
+
+       private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
                JSONTokener jsonTokener = new JSONTokener(response);
                JSONObject jsonObject = null;
                final char firstChar = jsonTokener.next();
-       jsonTokener.back();
+               jsonTokener.back();
+               if (null != response && response.length() == 0) {
+                       return null;
+               }
+
                if ('[' == firstChar) {
                        JSONArray jsonArray = new JSONArray(jsonTokener);
                        jsonObject = new JSONObject();
                        jsonObject.put("result", jsonArray);
+               } else if ('{' == firstChar) {
+                       return null;
+               } else if ('<' == firstChar) {
+                       return null;
                } else {
                        jsonObject = new JSONObject(jsonTokener);
                }
 
                return jsonObject;
-       } catch (JSONException excp) {
-       //      log.error("DMAAP - Error reading response data.", excp);
-               return null;
-       }
-       
-       
-       
-}
-       
-       private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
-                       JSONTokener jsonTokener = new JSONTokener(response);
-                       JSONObject jsonObject = null;
-                       final char firstChar = jsonTokener.next();
-               jsonTokener.back();
-               if(null != response && response.length()==0){
-                       return null;
-               }
-               
-                       if ('[' == firstChar) {
-                               JSONArray jsonArray = new JSONArray(jsonTokener);
-                               jsonObject = new JSONObject();
-                               jsonObject.put("result", jsonArray);
-                       } else if('{' == firstChar){
-                               return null;
-                       } else if('<' == firstChar){
-                               return null;
-                       }else{
-                               jsonObject = new JSONObject(jsonTokener);
-                       }
 
-                       return jsonObject;
-               
        }
-       
+
        private final String fTopic;
        private final String fGroup;
        private final String fId;
@@ -330,187 +337,184 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
        private String username;
        private String password;
        private String host;
-        private  String latitude;
-               private  String longitude;
-               private  String version;
-               private  String serviceName;
-               private  String env;
-               private  String partner;
-               private String routeOffer;
-               private  String subContextPath;
-               private  String protocol;
-               private  String methodType;
-               private  String url;
-               private  String dmeuser;
-               private  String dmepassword;
-               private  String contenttype;
-           private DME2Client sender;
-               public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-               public String consumerFilePath;
-               private String authKey;
-               private String authDate;
-        private Properties props;
-       private HashMap<String, String> DMETimeOuts;
-       private String handlers;
-       public static final String routerFilePath = null;
-       public static String getRouterFilePath() {
-               return routerFilePath;
-       }
-
-       public static void setRouterFilePath(String routerFilePath) {
-               MRSimplerBatchPublisher.routerFilePath = routerFilePath;
-       }
-               public String getConsumerFilePath() {
-                       return consumerFilePath;
-               }
+       HostSelector fHostSelector = null;
+       private String latitude;
+       private String longitude;
+       private String version;
+       private String serviceName;
+       private String env;
+       private String partner;
+       private String routeOffer;
+       private String subContextPath;
+       private String protocol;
+       private String methodType;
+       private String url;
+       private String dmeuser;
+       private String dmepassword;
+       private String contenttype;
+       private DME2Client sender;
+       public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+       public String consumerFilePath;
+       private String authKey;
+       private String authDate;
+       private Properties props;
+       private HashMap<String, String> DMETimeOuts;
+       private String handlers;
+       public static final String routerFilePath = null;
+
+       public static String getRouterFilePath() {
+               return routerFilePath;
+       }
 
-               public void setConsumerFilePath(String consumerFilePath) {
-                       this.consumerFilePath = consumerFilePath;
-               }
+       public static void setRouterFilePath(String routerFilePath) {
+               MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+       }
 
-               public String getProtocolFlag() {
-                       return protocolFlag;
-               }
+       public String getConsumerFilePath() {
+               return consumerFilePath;
+       }
+
+       public void setConsumerFilePath(String consumerFilePath) {
+               this.consumerFilePath = consumerFilePath;
+       }
 
-               public void setProtocolFlag(String protocolFlag) {
-                       this.protocolFlag = protocolFlag;
+       public String getProtocolFlag() {
+               return protocolFlag;
+       }
+
+       public void setProtocolFlag(String protocolFlag) {
+               this.protocolFlag = protocolFlag;
+       }
+
+       private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+               latitude = props.getProperty("Latitude");
+               longitude = props.getProperty("Longitude");
+               version = props.getProperty("Version");
+               serviceName = props.getProperty("ServiceName");
+               env = props.getProperty("Environment");
+               partner = props.getProperty("Partner");
+               routeOffer = props.getProperty("routeOffer");
+
+               subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+               // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
+               // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
+               // timeoutMs);
+
+               protocol = props.getProperty("Protocol");
+               methodType = props.getProperty("MethodType");
+               dmeuser = props.getProperty("username");
+               dmepassword = props.getProperty("password");
+               contenttype = props.getProperty("contenttype");
+               handlers = props.getProperty("sessionstickinessrequired");
+               // url =protocol+"://DME2SEARCH/"+
+               // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
+               // url = protocol +
+               // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
+
+               /**
+                * Changes to DME2Client url to use Partner for auto failover between
+                * data centers When Partner value is not provided use the routeOffer
+                * value for auto failover within a cluster
+                */
+
+               String preferredRouteKey = readRoute("preferredRouteKey");
+
+               if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+                                       + "&routeoffer=" + preferredRouteKey;
+               } else if (partner != null && !partner.isEmpty()) {
+                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+               } else if (routeOffer != null && !routeOffer.isEmpty()) {
+                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+                                       + routeOffer;
                }
-               
-               private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ 
-                       latitude = props.getProperty("Latitude");
-                       longitude = props.getProperty("Longitude");
-                       version = props.getProperty("Version");
-                       serviceName = props.getProperty("ServiceName");
-                       env = props.getProperty("Environment");
-                       partner = props.getProperty("Partner");
-                       routeOffer = props.getProperty("routeOffer");
-                       
-                       subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
-               //      subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
-                       //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
-                       
-                       protocol = props.getProperty("Protocol"); 
-                       methodType = props.getProperty("MethodType");
-                       dmeuser = props.getProperty("username");
-                       dmepassword = props.getProperty("password");
-                       contenttype = props.getProperty("contenttype");
-                       handlers = props.getProperty("sessionstickinessrequired");
-                       //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
-               //      url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
-               
-                       /**
-                        * Changes to DME2Client url to use Partner for auto failover between data centers
-                        * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
-                        */
-                       
-                       String preferredRouteKey = readRoute("preferredRouteKey");
-                                               
-                       if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; 
-                       }else  if (partner != null && !partner.isEmpty()) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
-                       }
-                       else if (routeOffer!=null && !routeOffer.isEmpty()) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
-                       }
-                       
-                       //log.info("url :"+url);
-                                               
-                       if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
-                       if(limit != -1 )url=url+"&limit="+limit;
-
-                       DMETimeOuts = new HashMap<String, String>();
-                       DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
-                       DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
-                       DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
-                       DMETimeOuts.put("Content-Type", contenttype);
-                       System.setProperty("AFT_LATITUDE", latitude);
-                       System.setProperty("AFT_LONGITUDE", longitude);
-                       System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
-               //      System.setProperty("DME2.DEBUG", "true");
-                       
-                       //SSL changes
-                       System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-                                       "SSLv3,TLSv1,TLSv1.1");
-                       System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
-                       System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-                       //SSL changes
-            
-                       sender = new DME2Client(new URI(url), timeoutMs+10000L);
-                       sender.setAllowAllHttpReturnCodes(true);
-                       sender.setMethod(methodType);
-                       sender.setSubContext(subContextPath);   
-                       if(dmeuser != null && dmepassword != null){
+
+               // log.info("url :"+url);
+
+               if (timeoutMs != -1)
+                       url = url + "&timeout=" + timeoutMs;
+               if (limit != -1)
+                       url = url + "&limit=" + limit;
+
+               // Add filter to DME2 Url
+               if (fFilter != null && fFilter.length() > 0)
+                       url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+
+               DMETimeOuts = new HashMap<String, String>();
+               DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+               DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+               DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+               DMETimeOuts.put("Content-Type", contenttype);
+               System.setProperty("AFT_LATITUDE", latitude);
+               System.setProperty("AFT_LONGITUDE", longitude);
+               System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+               // System.setProperty("DME2.DEBUG", "true");
+
+               // SSL changes
+               // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+               // "SSLv3,TLSv1,TLSv1.1");
+               System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+               System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+               System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+               // SSL changes
+
+               sender = new DME2Client(new URI(url), timeoutMs + 10000L);
+               sender.setAllowAllHttpReturnCodes(true);
+               sender.setMethod(methodType);
+               sender.setSubContext(subContextPath);
+               if (dmeuser != null && dmepassword != null) {
                        sender.setCredentials(dmeuser, dmepassword);
-                       //System.out.println(dmepassword);
-                       }
-                       sender.setHeaders(DMETimeOuts);
-                       sender.setPayload("");               
-                       
-                       if(handlers.equalsIgnoreCase("yes")){
-                               sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
-                               sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-                               }else{
-                                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-                               }
-               /*      HeaderReplyHandler headerhandler= new HeaderReplyHandler(); 
-                       sender.setReplyHandler(headerhandler);*/
-//                     } catch (DME2Exception x) {
-//                             getLog().warn(x.getMessage(), x);
-//                             System.out.println("XXXXXXXXXXXX"+x);
-//                     } catch (URISyntaxException x) {
-//                             System.out.println(x);
-//                             getLog().warn(x.getMessage(), x);
-//                     } catch (Exception x) {
-//                             System.out.println("XXXXXXXXXXXX"+x);
-//                             getLog().warn(x.getMessage(), x);
-//                     }
                }
-       public Properties getProps() {
-                       return props;
+               sender.setHeaders(DMETimeOuts);
+               sender.setPayload("");
+
+               if (handlers.equalsIgnoreCase("yes")) {
+                       sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+                                       props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+                       sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+               } else {
+                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
                }
+       }
 
-               public void setProps(Properties props) {
-                       this.props = props;
-               }
+       public Properties getProps() {
+               return props;
+       }
+
+       public void setProps(Properties props) {
+               this.props = props;
+       }
 
-       protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
-       {
-               final StringBuffer contexturl= new StringBuffer(url);
-       //      final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
-               final StringBuffer adds = new StringBuffer ();
-               if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); 
-               if ( limit > -1 )
-               {
-                       if ( adds.length () > 0 )
-                       {
-                               adds.append ( "&" );
+       protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+               final StringBuffer contexturl = new StringBuffer(url);
+               // final StringBuffer url = new StringBuffer (
+               // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
+               final StringBuffer adds = new StringBuffer();
+               if (timeoutMs > -1)
+                       adds.append("timeout=").append(timeoutMs);
+               if (limit > -1) {
+                       if (adds.length() > 0) {
+                               adds.append("&");
                        }
-                       adds.append ( "limit=" ).append ( limit );
+                       adds.append("limit=").append(limit);
                }
-               if ( fFilter != null && fFilter.length () > 0 )
-               {
+               if (fFilter != null && fFilter.length() > 0) {
                        try {
-                               if ( adds.length () > 0 )
-                               {
-                                       adds.append ( "&" );
+                               if (adds.length() > 0) {
+                                       adds.append("&");
                                }
                                adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                                throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
                        }
                }
-               if ( adds.length () > 0 )
-               {
-                       contexturl.append ( "?" ).append ( adds.toString () );
+               if (adds.length() > 0) {
+                       contexturl.append("?").append(adds.toString());
                }
-               
-               //sender.setSubContext(url.toString());
-               return contexturl.toString ();
+
+               // sender.setSubContext(url.toString());
+               return contexturl.toString();
        }
 
        public String getUsername() {
@@ -560,20 +564,20 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
        public void setfFilter(String fFilter) {
                this.fFilter = fFilter;
        }
-       
+
        private String readRoute(String routeKey) {
 
                try {
-                       
-                       MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
+
+                       MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
 
                } catch (Exception ex) {
-                       log.error("Reply Router Error " + ex.toString() );
+                       log.error("Reply Router Error " + ex.toString());
                }
-               String routeOffer = MRClientFactory.prop.getProperty(routeKey);         
+               String routeOffer = MRClientFactory.prop.getProperty(routeKey);
                return routeOffer;
        }
-       
+
        @Override
        public MRConsumerResponse fetchWithReturnConsumerResponse() {
 
@@ -582,13 +586,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
        }
 
        @Override
-       public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
-                       int limit) {
+       public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
                final LinkedList<String> msgs = new LinkedList<String>();
                MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
                try {
-                       if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
-                                       protocolFlag)) {
+                       if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
                                DMEConfigure(timeoutMs, limit);
 
                                String reply = sender.sendAndWait(timeoutMs + 10000L);
@@ -599,7 +601,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                                        final JSONArray a = o.getJSONArray("result");
 
                                        if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {                                                  
+                                               for (int i = 0; i < a.length(); i++) {
                                                        if (a.get(i) instanceof String)
                                                                msgs.add(a.getString(i));
                                                        else
@@ -612,15 +614,16 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                                createMRConsumerResponse(reply, mrConsumerResponse);
                        }
 
-                       if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
-                                       protocolFlag)) {
-                               final String urlPath = createUrlPath(
-                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-                                                               props.getProperty("Protocol")), timeoutMs,
-                                               limit);
+                       if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               /*
+                                * final String urlPath = createUrlPath(
+                                * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+                                * props.getProperty("Protocol")), timeoutMs, limit);
+                                */
 
-                               String response = getResponse(urlPath, username, password,
-                                               protocolFlag);
+                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+                               String response = getResponse(urlPath, username, password, protocolFlag);
 
                                final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
 
@@ -628,7 +631,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                                        final JSONArray a = o.getJSONArray("result");
 
                                        if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {                                                  
+                                               for (int i = 0; i < a.length(); i++) {
                                                        if (a.get(i) instanceof String)
                                                                msgs.add(a.getString(i));
                                                        else
@@ -641,16 +644,13 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                                createMRConsumerResponse(response, mrConsumerResponse);
                        }
 
-                       if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
-                                       protocolFlag)) {
+                       if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
                                final String urlPath = createUrlPath(
-                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-                                                               props.getProperty("Protocol")), timeoutMs,
-                                               limit);
+                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+                                               timeoutMs, limit);
 
-                               String response  = getAuthResponse(urlPath, authKey, authDate,
-                                               username, password, protocolFlag);
-                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);                       
+                               String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
                                if (o != null) {
                                        final JSONArray a = o.getJSONArray("result");
 
@@ -667,51 +667,74 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer
                                }
                                createMRConsumerResponse(response, mrConsumerResponse);
                        }
-                       
-                       
-                       
-               } catch (JSONException e) {     
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               // final String urlPath = createUrlPath(
+                               // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+                               // props.getProperty("Protocol")), timeoutMs,
+                               // limit);
+                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                               String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+                               if (o != null) {
+                                       final JSONArray a = o.getJSONArray("result");
+
+                                       if (a != null) {
+                                               for (int i = 0; i < a.length(); i++) {
+                                                       if (a.get(i) instanceof String)
+                                                               msgs.add(a.getString(i));
+                                                       else
+                                                               msgs.add(a.getJSONObject(i).toString());
+
+                                               }
+                                       }
+
+                               }
+                               createMRConsumerResponse(response, mrConsumerResponse);
+                       }
+
+               } catch (JSONException e) {
                        mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("json exception: ", e);
-               } catch (HttpException e) {                     
+                       log.error("json exception: ", e);
+               } catch (HttpException e) {
                        mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("http exception: ", e);
-               }catch(DME2Exception e){                        
+                       log.error("http exception: ", e);
+               } catch (DME2Exception e) {
                        mrConsumerResponse.setResponseCode(e.getErrorCode());
                        mrConsumerResponse.setResponseMessage(e.getErrorMessage());
-                        log.error("DME2 exception: ", e);
-               }catch (Exception e) {                  
+                       log.error("DME2 exception: ", e);
+               } catch (Exception e) {
                        mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("exception: ", e);
+                       log.error("exception: ", e);
                }
                mrConsumerResponse.setActualMessages(msgs);
                return mrConsumerResponse;
        }
 
        private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
-               
-               if(reply.startsWith("{")){
+
+               if (reply.startsWith("{")) {
                        JSONObject jObject = new JSONObject(reply);
                        String message = jObject.getString("message");
                        int status = jObject.getInt("status");
-               
+
                        mrConsumerResponse.setResponseCode(Integer.toString(status));
-                       
-                       if(null != message){
-                               mrConsumerResponse.setResponseMessage(message); 
-                       }       
-               }else if (reply.startsWith("<")){
+
+                       if (null != message) {
+                               mrConsumerResponse.setResponseMessage(message);
+                       }
+               } else if (reply.startsWith("<")) {
                        mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
-                       mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));                      
-               }else{
+                       mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+               } else {
                        mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-                       mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); 
+                       mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
                }
-               
+
        }
 
 }
index 398558d..db982ec 100644 (file)
@@ -47,6 +47,7 @@ import org.apache.http.HttpException;
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
 import org.json.JSONObject;
+import org.json.JSONTokener;
 
 import com.att.aft.dme2.api.DME2Client;
 import com.att.aft.dme2.api.DME2Exception;
@@ -55,76 +56,66 @@ import com.att.nsa.mr.client.MRBatchingPublisher;
 import com.att.nsa.mr.client.response.MRPublisherResponse;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
-{
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
        private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
 
-       public static class Builder 
-       {
-               public Builder ()
-               {
+       public static class Builder {
+               public Builder() {
                }
 
-               public Builder againstUrls ( Collection<String> baseUrls )
-               {
+               public Builder againstUrls(Collection<String> baseUrls) {
                        fUrls = baseUrls;
                        return this;
                }
 
-               public Builder onTopic ( String topic )
-               {
+               public Builder onTopic(String topic) {
                        fTopic = topic;
                        return this;
                }
 
-               public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
-               {
+               public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
                        fMaxBatchSize = maxBatchSize;
                        fMaxBatchAgeMs = maxBatchAgeMs;
                        return this;
                }
 
-               public Builder compress ( boolean compress )
-               {
+               public Builder compress(boolean compress) {
                        fCompress = compress;
                        return this;
                }
-               
-               public Builder httpThreadTime ( int threadOccuranceTime )
-               {
+
+               public Builder httpThreadTime(int threadOccuranceTime) {
                        this.threadOccuranceTime = threadOccuranceTime;
                        return this;
                }
-               
-               public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
-               {
+
+               public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
                        fAllowSelfSignedCerts = allowSelfSignedCerts;
                        return this;
                }
-               
-               public Builder withResponse ( boolean withResponse)
-               {
+
+               public Builder withResponse(boolean withResponse) {
                        fWithResponse = withResponse;
                        return this;
                }
-               public MRSimplerBatchPublisher build ()
-               {
-                       if(!fWithResponse) 
-                       {
+
+               public MRSimplerBatchPublisher build() {
+                       if (!fWithResponse) {
                                try {
-                                       return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+                                       return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+                                                       fAllowSelfSignedCerts, threadOccuranceTime);
                                } catch (MalformedURLException e) {
                                        throw new RuntimeException(e);
                                }
-                       } else 
-                       {
+                       } else {
                                try {
-                                       return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+                                       return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+                                                       fAllowSelfSignedCerts, fMaxBatchSize);
                                } catch (MalformedURLException e) {
                                        throw new RuntimeException(e);
                                }
                        }
-                               
+
                }
 
                private Collection<String> fUrls;
@@ -135,262 +126,250 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                private int threadOccuranceTime = 50;
                private boolean fAllowSelfSignedCerts = false;
                private boolean fWithResponse = false;
-               
+
        };
 
        @Override
-       public int send ( String partition, String msg )
-       {
-               return send ( new message ( partition, msg ) );
+       public int send(String partition, String msg) {
+               return send(new message(partition, msg));
        }
+
        @Override
-       public int send ( String msg )
-       {
-               return send ( new message ( null, msg ) );
+       public int send(String msg) {
+               return send(new message(null, msg));
        }
 
-
        @Override
-       public int send ( message msg )
-       {
-               final LinkedList<message> list = new LinkedList<message> ();
-               list.add ( msg );
-               return send ( list );
+       public int send(message msg) {
+               final LinkedList<message> list = new LinkedList<message>();
+               list.add(msg);
+               return send(list);
        }
-       
-       
 
        @Override
-       public synchronized int send ( Collection<message> msgs )
-       {
-               if ( fClosed )
-               {
-                       throw new IllegalStateException ( "The publisher was closed." );
+       public synchronized int send(Collection<message> msgs) {
+               if (fClosed) {
+                       throw new IllegalStateException("The publisher was closed.");
                }
-               
-               for ( message userMsg : msgs )
-               {
-                       fPending.add ( new TimestampedMessage ( userMsg ) );
+
+               for (message userMsg : msgs) {
+                       fPending.add(new TimestampedMessage(userMsg));
                }
-               return getPendingMessageCount ();
+               return getPendingMessageCount();
        }
 
        @Override
-       public synchronized int getPendingMessageCount ()
-       {
-               return fPending.size ();
+       public synchronized int getPendingMessageCount() {
+               return fPending.size();
        }
 
        @Override
-       public void close ()
-       {
-               try
-               {
-                       final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
-                       if ( remains.size() > 0 )
-                       {
-                               getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
-                                       + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+       public void close() {
+               try {
+                       final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+                       if (remains.size() > 0) {
+                               getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+                                               + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
                        }
-               }
-               catch ( InterruptedException e )
-               {
-                       getLog().warn ( "Possible message loss. " + e.getMessage(), e );
-               }
-               catch ( IOException e )
-               {
-                       getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+               } catch (InterruptedException e) {
+                       getLog().warn("Possible message loss. " + e.getMessage(), e);
+               } catch (IOException e) {
+                       getLog().warn("Possible message loss. " + e.getMessage(), e);
                }
        }
 
        @Override
-       public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
-       {
-               synchronized ( this )
-               {
+       public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+               synchronized (this) {
                        fClosed = true;
 
                        // stop the background sender
-                       fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
-                       fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
-                       fExec.shutdown ();
+                       fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+                       fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+                       fExec.shutdown();
                }
 
-               final long now = Clock.now ();
-               final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+               final long now = Clock.now();
+               final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
                final long timeoutAtMs = now + waitInMs;
 
-               while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
-               {
-                       send ( true );
-                       Thread.sleep ( 250 );
+               while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+                       send(true);
+                       Thread.sleep(250);
                }
 
-               synchronized ( this )
-               {
-                       final LinkedList<message> result = new LinkedList<message> ();
-                       fPending.drainTo ( result );
+               synchronized (this) {
+                       final LinkedList<message> result = new LinkedList<message>();
+                       fPending.drainTo(result);
                        return result;
                }
        }
 
        /**
-        * Possibly send a batch to the MR server. This is called by the background thread
-        * and the close() method
+        * Possibly send a batch to the MR server. This is called by the background
+        * thread and the close() method
         * 
         * @param force
         */
-       private synchronized void send ( boolean force )
-       {
-               if ( force || shouldSendNow () )
-               {
-                       if ( !sendBatch () )
-                       {
-                               getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+       private synchronized void send(boolean force) {
+               if (force || shouldSendNow()) {
+                       if (!sendBatch()) {
+                               getLog().warn("Send failed, " + fPending.size() + " message to send.");
 
                                // note the time for back-off
-                               fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+                               fDontSendUntilMs = sfWaitAfterError + Clock.now();
                        }
                }
        }
 
-       private synchronized boolean shouldSendNow ()
-       {
+       private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if ( fPending.size () > 0 )
-               {
-                       final long nowMs = Clock.now ();
+               if (fPending.size() > 0) {
+                       final long nowMs = Clock.now();
 
-                       shouldSend = ( fPending.size() >= fMaxBatchSize );
-                       if ( !shouldSend )
-                       {
+                       shouldSend = (fPending.size() >= fMaxBatchSize);
+                       if (!shouldSend) {
                                final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
                                shouldSend = sendAtMs <= nowMs;
                        }
 
                        // however, wait after an error
-                       shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
+                       shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
                }
                return shouldSend;
        }
 
-       private synchronized boolean sendBatch ()
-       {
-               // it's possible for this call to be made with an empty list. in this case, just return.
-               if ( fPending.size() < 1 )
-               {
+       /**
+        * Method to parse published JSON Objects and Arrays
+        * 
+        * @return JSONArray
+        */
+       private JSONArray parseJSON() {
+               JSONArray jsonArray = new JSONArray();
+               for (TimestampedMessage m : fPending) {
+                       JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+                       JSONObject jsonObject = null;
+                       JSONArray tempjsonArray = null;
+                       final char firstChar = jsonTokener.next();
+                       jsonTokener.back();
+                       if ('[' == firstChar) {
+                               tempjsonArray = new JSONArray(jsonTokener);
+                               if (null != tempjsonArray) {
+                                       for (int i = 0; i < tempjsonArray.length(); i++) {
+                                               jsonArray.put(tempjsonArray.getJSONObject(i));
+                                       }
+                               }
+                       } else {
+                               jsonObject = new JSONObject(jsonTokener);
+                               jsonArray.put(jsonObject);
+                       }
+
+               }
+               return jsonArray;
+       }
+
+       private synchronized boolean sendBatch() {
+               // it's possible for this call to be made with an empty list. in this
+               // case, just return.
+               if (fPending.size() < 1) {
                        return true;
                }
 
-               final long nowMs = Clock.now ();
-               
+               final long nowMs = Clock.now();
+
                host = this.fHostSelector.selectBaseHost();
-               
-               final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-               
-
-               try
-               {
-                       /*final String contentType =
-                               fCompress ?
-                                       MRFormat.CAMBRIA_ZIP.toString () :
-                                       MRFormat.CAMBRIA.toString () 
-                       ;*/
-            
-                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+                               props.getProperty("partition"));
+
+               try {
+                       /*
+                        * final String contentType = fCompress ?
+                        * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
+                        */
+
+                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
                        OutputStream os = baseStream;
                        final String contentType = props.getProperty("contenttype");
-                       if(contentType.equalsIgnoreCase("application/json")){
-                               JSONArray jsonArray = new JSONArray();
-                               for ( TimestampedMessage m : fPending )
-                               {
-                                       JSONObject jsonObject = new JSONObject(m.fMsg);
-                                                               
-                                               jsonArray.put(jsonObject);
-                               
+                       if (contentType.equalsIgnoreCase("application/json")) {
+                               JSONArray jsonArray = parseJSON();
+                               os.write(jsonArray.toString().getBytes());
+                               os.close();
+
+                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
+                               }
+                               os.close();
+                       } else if (contentType.equalsIgnoreCase("application/cambria")
+                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                                       os = new GZIPOutputStream(baseStream);
+                               }
+                               for (TimestampedMessage m : fPending) {
+
+                                       os.write(("" + m.fPartition.length()).getBytes());
+                                       os.write('.');
+                                       os.write(("" + m.fMsg.length()).getBytes());
+                                       os.write('.');
+                                       os.write(m.fPartition.getBytes());
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
                                }
-                               os.write (jsonArray.toString().getBytes() );    
                                os.close();
+                       } else {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
 
-                               }else if (contentType.equalsIgnoreCase("text/plain")){
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-                                       if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-                                       {
-                                               os = new GZIPOutputStream ( baseStream );
-                                       }
-                                       for ( TimestampedMessage m : fPending )
-                                       {
-                                               
-                                               os.write ( ( "" + m.fPartition.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( ( "" + m.fMsg.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( m.fPartition.getBytes() );
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               }else{
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                       
-                                       }
-                                       os.close ();
                                }
-             
-               
+                               os.close();
+                       }
 
-                       final long startMs = Clock.now ();
+                       final long startMs = Clock.now();
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               
-                       
+
                                DME2Configue();
-                               
+
                                Thread.sleep(5);
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               sender.setPayload(os.toString());               
+                               getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               sender.setPayload(os.toString());
                                String dmeResponse = sender.sendAndWait(5000L);
-                               
-                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
-                                               + dmeResponse.toString();
+
+                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return true;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+                                               username, password, protocolFlag);
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
                                        return false;
                                }
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return true;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-                               
-                               
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+                                               protocolFlag);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
                                        return false;
                                }
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
@@ -398,118 +377,100 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return true;
                        }
-               }
-               catch ( IllegalArgumentException x ) {
-                       getLog().warn ( x.getMessage(), x );
-               } catch ( IOException x ) {
-                       getLog().warn ( x.getMessage(), x );
+               } catch (IllegalArgumentException x) {
+                       getLog().warn(x.getMessage(), x);
+               } catch (IOException x) {
+                       getLog().warn(x.getMessage(), x);
                } catch (HttpException x) {
-                       getLog().warn ( x.getMessage(), x );
+                       getLog().warn(x.getMessage(), x);
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
                }
                return false;
        }
 
-       public synchronized MRPublisherResponse sendBatchWithResponse () 
-       {
-               // it's possible for this call to be made with an empty list. in this case, just return.
-               if ( fPending.size() < 1 )
-               {
+       public synchronized MRPublisherResponse sendBatchWithResponse() {
+               // it's possible for this call to be made with an empty list. in this
+               // case, just return.
+               if (fPending.size() < 1) {
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage("No Messages to send");
                        return pubResponse;
                }
 
-               final long nowMs = Clock.now ();
-               
+               final long nowMs = Clock.now();
+
                host = this.fHostSelector.selectBaseHost();
-               
-               final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-               OutputStream os=null;
-               try
-               {
-                       
-                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
-                        os = baseStream;
+
+               final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+                               props.getProperty("partition"));
+               OutputStream os = null;
+               try {
+
+                       final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+                       os = baseStream;
                        final String contentType = props.getProperty("contenttype");
-                       if(contentType.equalsIgnoreCase("application/json")){
-                               JSONArray jsonArray = new JSONArray();
-                               for ( TimestampedMessage m : fPending )
-                               {
-                                       JSONObject jsonObject = new JSONObject(m.fMsg);
-                                                               
-                                               jsonArray.put(jsonObject);
-                               
+                       if (contentType.equalsIgnoreCase("application/json")) {
+                               JSONArray jsonArray = parseJSON();
+                               os.write(jsonArray.toString().getBytes());
+                       } else if (contentType.equalsIgnoreCase("text/plain")) {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
                                }
-                               os.write (jsonArray.toString().getBytes() );    
-                               }else if (contentType.equalsIgnoreCase("text/plain")){
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                               } else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-                                       if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-                                       {
-                                               os = new GZIPOutputStream ( baseStream );
-                                       }
-                                       for ( TimestampedMessage m : fPending )
-                                       {
-                                               
-                                               os.write ( ( "" + m.fPartition.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( ( "" + m.fMsg.length () ).getBytes() );
-                                               os.write ( '.' );
-                                               os.write ( m.fPartition.getBytes() );
-                                               os.write ( m.fMsg.getBytes() );
-                                               os.write ( '\n' );
-                                       }
-                                       os.close ();
-                               }else{
-                                       for ( TimestampedMessage m : fPending )
-                                       {                                                                               
-                                               os.write ( m.fMsg.getBytes() );
-                                       
-                                       }
+                       } else if (contentType.equalsIgnoreCase("application/cambria")
+                                       || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+                               if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+                                       os = new GZIPOutputStream(baseStream);
+                               }
+                               for (TimestampedMessage m : fPending) {
+
+                                       os.write(("" + m.fPartition.length()).getBytes());
+                                       os.write('.');
+                                       os.write(("" + m.fMsg.length()).getBytes());
+                                       os.write('.');
+                                       os.write(m.fPartition.getBytes());
+                                       os.write(m.fMsg.getBytes());
+                                       os.write('\n');
                                }
-             
-               
+                               os.close();
+                       } else {
+                               for (TimestampedMessage m : fPending) {
+                                       os.write(m.fMsg.getBytes());
 
-                       final long startMs = Clock.now ();
+                               }
+                       }
+
+                       final long startMs = Clock.now();
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               
-                       
+
                                try {
-                               DME2Configue();
-                               
-                               Thread.sleep(5);
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               sender.setPayload(os.toString());               
-                                               
-                               
-                               String dmeResponse = sender.sendAndWait(5000L);
-                               System.out.println("dmeres->"+dmeResponse);             
-                               
-                               
-                               pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
-                                       return pubResponse;
-                               }
-                               final String logLine = String.valueOf((Clock.now() - startMs))
-                                               + dmeResponse.toString();
-                               getLog().info(logLine);
-                               fPending.clear();
-                               
-                               }
-                               catch (DME2Exception x) {
+                                       DME2Configue();
+
+                                       Thread.sleep(5);
+                                       getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+                                                       + (nowMs - fPending.peek().timestamp) + " ms");
+                                       sender.setPayload(os.toString());
+
+                                       String dmeResponse = sender.sendAndWait(5000L);
+
+                                       pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+                                       if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                                       || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+                                               return pubResponse;
+                                       }
+                                       final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+                                       getLog().info(logLine);
+                                       fPending.clear();
+
+                               } catch (DME2Exception x) {
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(x.getErrorCode());
                                        pubResponse.setResponseMessage(x.getErrorMessage());
                                } catch (URISyntaxException x) {
-                                       
+
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                                        pubResponse.setResponseMessage(x.getMessage());
@@ -517,135 +478,127 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                                        pubResponse.setResponseMessage(x.getMessage());
-                                        logger.error("exception: ", x);
-                                       
+                                       logger.error("exception: ", x);
+
                                }
-                               
+
                                return pubResponse;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               
-                               
-                               pubResponse = createMRPublisherResponse(result,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+                                               authDate, username, password, protocolFlag);
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
                                        return pubResponse;
                                }
-                               
+
                                final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
                                getLog().info(logLine);
                                fPending.clear();
                                return pubResponse;
-                       } 
-                       
+                       }
+
                        if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-                               final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-                               
-                               //System.out.println(result.getInt("status"));
-                               //Here we are checking for error response. If HTTP status
-                               //code is not within the http success response code
-                               //then we consider this as error and return false
-                               pubResponse = createMRPublisherResponse(result,pubResponse);
-                               
-                               if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-                                       
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+                                               password, protocolFlag);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
                                        return pubResponse;
                                }
-                               
+
                                final String logLine = String.valueOf((Clock.now() - startMs));
                                getLog().info(logLine);
                                fPending.clear();
                                return pubResponse;
                        }
-               }
-               catch ( IllegalArgumentException x ) {
-                       getLog().warn ( x.getMessage(), x );
+               } catch (IllegalArgumentException x) {
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
-               } catch ( IOException x ) {
-                       getLog().warn ( x.getMessage(), x );
+
+               } catch (IOException x) {
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                } catch (HttpException x) {
-                       getLog().warn ( x.getMessage(), x );
+                       getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                } catch (Exception x) {
                        getLog().warn(x.getMessage(), x);
-                       
+
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                        pubResponse.setResponseMessage(x.getMessage());
-                       
+
                }
-               
+
                finally {
-                       if (fPending.size()>0) {
-                               getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+                       if (fPending.size() > 0) {
+                               getLog().warn("Send failed, " + fPending.size() + " message to send.");
                                pubResponse.setPendingMsgs(fPending.size());
                        }
                        if (os != null) {
                                try {
-                               os.close();
+                                       os.close();
                                } catch (Exception x) {
                                        getLog().warn(x.getMessage(), x);
                                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
                                        pubResponse.setResponseMessage("Error in closing Output Stream");
                                }
-                               }
+                       }
                }
-               
+
                return pubResponse;
        }
-       
-private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-               
-        if (reply.isEmpty()) 
-        {
-                
-                mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
-                mrPubResponse.setResponseMessage("Please verify the Producer properties");
-        }
-        else if(reply.startsWith("{"))
-        {
+
+       private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+               if (reply.isEmpty()) {
+
+                       mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+                       mrPubResponse.setResponseMessage("Please verify the Producer properties");
+               } else if (reply.startsWith("{")) {
                        JSONObject jObject = new JSONObject(reply);
-                       if(jObject.has("message") && jObject.has("status"))
-                       {
+                       if (jObject.has("message") && jObject.has("status")) {
                                String message = jObject.getString("message");
-                               if(null != message)
-                               {
-                                       mrPubResponse.setResponseMessage(message);      
+                               if (null != message) {
+                                       mrPubResponse.setResponseMessage(message);
                                }
                                mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+                       } else {
+                               mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+                               mrPubResponse.setResponseMessage(reply);
                        }
-                       else
-                        {
-                                       mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-                                       mrPubResponse.setResponseMessage(reply);        
-                        }
-     }
-        else if (reply.startsWith("<"))
-        {
-                String responseCode = getHTTPErrorResponseCode(reply);
-                if( responseCode.contains("403"))
-                       {
-                        responseCode = "403";
-                       }       
-                mrPubResponse.setResponseCode(responseCode);
-                       mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));   
-        }
-        
+               } else if (reply.startsWith("<")) {
+                       String responseCode = getHTTPErrorResponseCode(reply);
+                       if (responseCode.contains("403")) {
+                               responseCode = "403";
+                       }
+                       mrPubResponse.setResponseCode(responseCode);
+                       mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+               }
+
                return mrPubResponse;
        }
 
@@ -658,10 +611,10 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        private String username;
        private String password;
        private String host;
-       
-       //host selector
+
+       // host selector
        private HostSelector fHostSelector = null;
-       
+
        private final LinkedBlockingQueue<TimestampedMessage> fPending;
        private long fDontSendUntilMs;
        private final ScheduledThreadPoolExecutor fExec;
@@ -684,25 +637,24 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        private HashMap<String, String> DMETimeOuts;
        private DME2Client sender;
        public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-       public String producerFilePath;
        private String authKey;
        private String authDate;
        private String handlers;
        private Properties props;
        public static String routerFilePath;
-       protected static final Map<String, String> headers=new HashMap<String, String>();
+       protected static final Map<String, String> headers = new HashMap<String, String>();
        public static MultivaluedMap<String, Object> headersMap;
-       
-       
+
        private MRPublisherResponse pubResponse;
-       
+
        public MRPublisherResponse getPubResponse() {
                return pubResponse;
        }
+
        public void setPubResponse(MRPublisherResponse pubResponse) {
                this.pubResponse = pubResponse;
        }
-       
+
        public static String getRouterFilePath() {
                return routerFilePath;
        }
@@ -719,14 +671,6 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                this.props = props;
        }
 
-       public String getProducerFilePath() {
-               return producerFilePath;
-       }
-
-       public void setProducerFilePath(String producerFilePath) {
-               this.producerFilePath = producerFilePath;
-       }
-
        public String getProtocolFlag() {
                return protocolFlag;
        }
@@ -734,14 +678,14 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        public void setProtocolFlag(String protocolFlag) {
                this.protocolFlag = protocolFlag;
        }
-       
-       
+
        private void DME2Configue() throws Exception {
                try {
-                       
-               /*      FileReader reader = new FileReader(new File (producerFilePath));
-                       Properties props = new Properties();            
-                       props.load(reader);*/
+
+                       /*
+                        * FileReader reader = new FileReader(new File (producerFilePath));
+                        * Properties props = new Properties(); props.load(reader);
+                        */
                        latitude = props.getProperty("Latitude");
                        longitude = props.getProperty("Longitude");
                        version = props.getProperty("Version");
@@ -749,41 +693,43 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                        env = props.getProperty("Environment");
                        partner = props.getProperty("Partner");
                        routeOffer = props.getProperty("routeOffer");
-                       subContextPath = props.getProperty("SubContextPath")+fTopic;
-                       /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
-                               subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
-                       }*/                     
+                       subContextPath = props.getProperty("SubContextPath") + fTopic;
+                       /*
+                        * if(props.getProperty("partition")!=null &&
+                        * !props.getProperty("partition").equalsIgnoreCase("")){
+                        * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
+                        * "partition"); }
+                        */
                        protocol = props.getProperty("Protocol");
                        methodType = props.getProperty("MethodType");
                        dmeuser = props.getProperty("username");
                        dmepassword = props.getProperty("password");
                        contentType = props.getProperty("contenttype");
                        handlers = props.getProperty("sessionstickinessrequired");
-                       routerFilePath= props.getProperty("DME2preferredRouterFilePath");
-                       
+                       routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
                        /**
-                        * Changes to DME2Client url to use Partner for auto failover between data centers
-                        * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
+                        * Changes to DME2Client url to use Partner for auto failover
+                        * between data centers When Partner value is not provided use the
+                        * routeOffer value for auto failover within a cluster
                         */
-                       
 
                        String partitionKey = props.getProperty("partition");
-                       
-                       if (partner != null && !partner.isEmpty() ) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
-                       }
-                       else if (routeOffer!=null && !routeOffer.isEmpty()) 
-                       { 
-                               url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
+
+                       if (partner != null && !partner.isEmpty()) {
+                               url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+                                               + partner;
+                               if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+                                       url = url + "&partitionKey=" + partitionKey;
+                               }
+                       } else if (routeOffer != null && !routeOffer.isEmpty()) {
+                               url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+                                               + routeOffer;
+                               if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+                                       url = url + "&partitionKey=" + partitionKey;
+                               }
                        }
-                        
+
                        DMETimeOuts = new HashMap<String, String>();
                        DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
                        DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
@@ -791,56 +737,56 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                        DMETimeOuts.put("Content-Type", contentType);
                        System.setProperty("AFT_LATITUDE", latitude);
                        System.setProperty("AFT_LONGITUDE", longitude);
-                       System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
-                       //System.setProperty("DME2.DEBUG", "true");
-               //      System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
-                       //System.out.println("XXXXXX"+url);
-                       
-                       //SSL changes
-                       System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-                                       "SSLv3,TLSv1,TLSv1.1");
+                       System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+                       // System.setProperty("DME2.DEBUG", "true");
+
+                       // SSL changes
+                       // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+                       // "SSLv3,TLSv1,TLSv1.1");
+                       System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
                        System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
                        System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-                       
-                       //SSL changes
-                       
+
+                       // SSL changes
+
                        sender = new DME2Client(new URI(url), 5000L);
-                               
+
                        sender.setAllowAllHttpReturnCodes(true);
                        sender.setMethod(methodType);
-                       sender.setSubContext(subContextPath);   
+                       sender.setSubContext(subContextPath);
                        sender.setCredentials(dmeuser, dmepassword);
                        sender.setHeaders(DMETimeOuts);
-                       if(handlers.equalsIgnoreCase("yes")){
-                               sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+                       if (handlers.equalsIgnoreCase("yes")) {
+                               sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+                                               props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+                                               props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
                                sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-                               }else{
-                                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-                               }
+                       } else {
+                               sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+                       }
                } catch (DME2Exception x) {
                        getLog().warn(x.getMessage(), x);
-                       throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+                       throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
                } catch (URISyntaxException x) {
-                       
+
                        getLog().warn(x.getMessage(), x);
-                       throw new URISyntaxException(url,x.getMessage());
+                       throw new URISyntaxException(url, x.getMessage());
                } catch (Exception x) {
 
                        getLog().warn(x.getMessage(), x);
                        throw new Exception(x.getMessage());
                }
        }
-       
-       private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
-       {
-               super ( hosts );
 
-               if ( topic == null || topic.length() < 1 )
-               {
-                       throw new IllegalArgumentException ( "A topic must be provided." );
+       private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+                       boolean compress) throws MalformedURLException {
+               super(hosts);
+
+               if (topic == null || topic.length() < 1) {
+                       throw new IllegalArgumentException("A topic must be provided.");
                }
-               
+
                fHostSelector = new HostSelector(hosts, null);
                fClosed = false;
                fTopic = topic;
@@ -848,49 +794,45 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
 
-               fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+               fPending = new LinkedBlockingQueue<TimestampedMessage>();
                fDontSendUntilMs = 0;
-               fExec = new ScheduledThreadPoolExecutor ( 1 );
+               fExec = new ScheduledThreadPoolExecutor(1);
                pubResponse = new MRPublisherResponse();
-               
+
        }
-       
-       private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
-       {
-               super ( hosts );
 
-               if ( topic == null || topic.length() < 1 )
-               {
-                       throw new IllegalArgumentException ( "A topic must be provided." );
+       private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+                       boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+               super(hosts);
+
+               if (topic == null || topic.length() < 1) {
+                       throw new IllegalArgumentException("A topic must be provided.");
                }
-               
+
                fHostSelector = new HostSelector(hosts, null);
                fClosed = false;
                fTopic = topic;
                fMaxBatchSize = maxBatchSize;
                fMaxBatchAgeMs = maxBatchAgeMs;
                fCompress = compress;
-               threadOccuranceTime=httpThreadOccurnace;
-               fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+               threadOccuranceTime = httpThreadOccurnace;
+               fPending = new LinkedBlockingQueue<TimestampedMessage>();
                fDontSendUntilMs = 0;
-               fExec = new ScheduledThreadPoolExecutor ( 1 );
-               fExec.scheduleAtFixedRate ( new Runnable()
-               {
+               fExec = new ScheduledThreadPoolExecutor(1);
+               fExec.scheduleAtFixedRate(new Runnable() {
                        @Override
-                       public void run ()
-                       {
-                               send ( false );
+                       public void run() {
+                               send(false);
                        }
-               }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+               }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
        }
 
-       private static class TimestampedMessage extends message
-       {
-               public TimestampedMessage ( message m )
-               {
-                       super ( m );
+       private static class TimestampedMessage extends message {
+               public TimestampedMessage(message m) {
+                       super(m);
                        timestamp = Clock.now();
                }
+
                public final long timestamp;
        }
 
@@ -941,5 +883,5 @@ private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherR
        public void setAuthDate(String authDate) {
                this.authDate = authDate;
        }
-       
+
 }
index bdd15d4..2886db5 100644 (file)
@@ -38,14 +38,14 @@ public class SimpleExampleConsumer {
 
        private static final Logger logger = LoggerFactory.getLogger(SimpleExampleConsumer.class);
 
-    private SimpleExampleConsumer() {
-    }
+       private SimpleExampleConsumer() {
+       }
 
        public static void main(String[] args) {
 
                long count = 0;
                long nextReport = 5000;
-                String key;
+               String key;
 
                final long startMs = System.currentTimeMillis();
 
@@ -54,24 +54,24 @@ public class SimpleExampleConsumer {
                        final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties");
                        while (true) {
                                for (String msg : cc.fetch()) {
-                                        logger.debug("Message Received: " + msg);
+                                       logger.debug("Message Received: " + msg);
                                }
                                // Header for DME2 Call.
                                MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
-                               for (MultivaluedMap.Entry<String,List<Object>> entry: headersMap.entrySet()) {
-                                    key = entry.getKey();
-                                    logger.debug("Header Key " + key);
-                                    logger.debug("Header Value " + headersMap.get(key));
+                               for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) {
+                                       key = entry.getKey();
+                                       logger.debug("Header Key " + key);
+                                       logger.debug("Header Value " + headersMap.get(key));
                                }
                                // Header for HTTP Call.
-                               
-                                Map<String, String> dme2headersMap=MRClientFactory.DME2HeadersMap;
-                                 for(Map.Entry<String,String> entry: dme2headersMap.entrySet()) {
-                                     key = entry.getKey();
-                                     logger.debug("Header Key " + key);
-                                     logger.debug("Header Value " + dme2headersMap.get(key));
-                                 }
-                                
+
+                               Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
+                               for (Map.Entry<String, String> entry : dme2headersMap.entrySet()) {
+                                       key = entry.getKey();
+                                       logger.debug("Header Key " + key);
+                                       logger.debug("Header Value " + dme2headersMap.get(key));
+                               }
+
                                if (count > nextReport) {
                                        nextReport += 5000;
 
@@ -79,11 +79,10 @@ public class SimpleExampleConsumer {
                                        final long elapsedMs = endMs - startMs;
                                        final double elapsedSec = elapsedMs / 1000.0;
                                        final double eps = count / elapsedSec;
-                                       logger.error("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
                                }
                        }
                } catch (Exception x) {
-                    logger.error(x.getClass().getName() + ": " + x.getMessage());
+                       logger.error(x.getClass().getName() + ": " + x.getMessage());
                }
        }
 }
index 6e86d47..a4a176e 100644 (file)
@@ -29,11 +29,9 @@ package com.att.nsa.mr.test.clients;
  *
  */
 public enum ProtocolTypeConstants {
-       
-       DME2("DME2"),
-       AAF_AUTH("HTTPAAF"),
-       AUTH_KEY("HTTPAUTH");
-       
+
+       DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
+
        private String value;
 
        private ProtocolTypeConstants(String value) {
index 5ae36d2..0e3ee5a 100644 (file)
@@ -33,57 +33,52 @@ import org.slf4j.LoggerFactory;
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
 
-public class SimpleExampleConsumer
-{
+public class SimpleExampleConsumer {
 
-       static FileWriter routeWriter= null;
-       static Properties props=null;   
-       static FileReader routeReader=null;
-       public static void main ( String[] args )
-       {
+       static FileWriter routeWriter = null;
+       static Properties props = null;
+       static FileReader routeReader = null;
+
+       public static void main(String[] args) {
                final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-       
+
                long count = 0;
                long nextReport = 5000;
 
-               final long startMs = System.currentTimeMillis ();
-                               
-               try
-               {
-                       String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
-                                                       
-                       
-                       File fo= new File(routeFilePath);
-                       if(!fo.exists()){
-                                       routeWriter=new FileWriter(new File (routeFilePath));
-                       }       
-                       routeReader= new FileReader(new File (routeFilePath));
-                       props= new Properties();
-                       final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" );
-                       while ( true )
-                       {
-                               for ( String msg : cc.fetch () )
-                               {
-                                       //System.out.println ( "" + (++count) + ": " + msg );
+               final long startMs = System.currentTimeMillis();
+
+               try {
+                       String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
+
+                       File fo = new File(routeFilePath);
+                       if (!fo.exists()) {
+                               routeWriter = new FileWriter(new File(routeFilePath));
+                       }
+                       routeReader = new FileReader(new File(routeFilePath));
+                       props = new Properties();
+                       final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
+                       int i = 0;
+                       while (i < 10) {
+                               Thread.sleep(2);
+                               i++;
+                               for (String msg : cc.fetch()) {
+                                       // System.out.println ( "" + (++count) + ": " + msg );
                                        System.out.println(msg);
                                }
-       
-                               if ( count > nextReport )
-                               {
+
+                               if (count > nextReport) {
                                        nextReport += 5000;
-       
-                                       final long endMs = System.currentTimeMillis ();
+
+                                       final long endMs = System.currentTimeMillis();
                                        final long elapsedMs = endMs - startMs;
                                        final double elapsedSec = elapsedMs / 1000.0;
                                        final double eps = count / elapsedSec;
-                                       System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+                                       System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
                                }
                        }
-               }
-               catch ( Exception x )
-               {
-                       System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
-                    LOG.error("exception: ", x);
+               } catch (Exception x) {
+                       System.err.println(x.getClass().getName() + ": " + x.getMessage());
+                       LOG.error("exception: ", x);
                }
        }
 }
index d6e413c..e1118ab 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=0
+patch=1
 
 base_version=${major}.${minor}.${patch}