[DMAAP-CLIENT] First sonar issues review part2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
index 1780703..88d3dab 100644 (file)
@@ -5,12 +5,13 @@
  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
  *  ================================================================================
  *   Modifications Copyright © 2018 IBM.
- * ================================================================================
+ *   Modifications Copyright © 2021 Orange.
+ *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  ============LICENSE_END=========================================================
  *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ *
  *******************************************************************************/
+
 package org.onap.dmaap.mr.client;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeSet;
 import java.util.UUID;
 import javax.ws.rs.core.MultivaluedMap;
+
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
 import org.onap.dmaap.mr.client.impl.MRMetaClient;
 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.onap.dmaap.mr.tools.ValidatorUtil;
 
 /**
@@ -50,10 +59,12 @@ import org.onap.dmaap.mr.tools.ValidatorUtil;
  * instance.)<br/>
  * <br/>
  * Publishers
- * 
+ *
  * @author author
  */
 public class MRClientFactory {
+
+    private static final String ID = "id";
     private static final String AUTH_KEY = "authKey";
     private static final String AUTH_DATE = "authDate";
     private static final String PASSWORD = "password";
@@ -63,6 +74,36 @@ public class MRClientFactory {
     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
     private static final String TOPIC = "topic";
     private static final String TRANSPORT_TYPE = "TransportType";
+    private static final String MAX_BATCH_SIZE = "maxBatchSize";
+    private static final String MAX_AGE_MS = "maxAgeMs";
+    private static final String MESSAGE_SENT_THREAD_OCCURRENCE_OLD = "MessageSentThreadOccurance";
+    private static final String MESSAGE_SENT_THREAD_OCCURRENCE = "MessageSentThreadOccurrence";
+    private static final String GROUP = "group";
+    private static final String SERVICE_NAME = "ServiceName";
+    private static final String PARTNER = "Partner";
+    private static final String ROUTE_OFFER = "routeOffer";
+    private static final String PROTOCOL = "Protocol";
+    private static final String METHOD_TYPE = "MethodType";
+    private static final String CONTENT_TYPE = "contenttype";
+    private static final String LATITUDE = "Latitude";
+    private static final String LONGITUDE = "Longitude";
+    private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
+    private static final String VERSION = "Version";
+    private static final String ENVIRONMENT = "Environment";
+    private static final String SUB_CONTEXT_PATH = "SubContextPath";
+    private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
+    private static final String PARTITION = "partition";
+    private static final String COMPRESS = "compress";
+    private static final String TIMEOUT = "timeout";
+    private static final String LIMIT = "limit";
+    private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
+    private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
+    private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
+    private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
+    private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
+    private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
+    private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
+    private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
 
     private static MultivaluedMap<String, Object> httpHeadersMap;
     public static Map<String, String> DME2HeadersMap;
@@ -82,6 +123,7 @@ public class MRClientFactory {
 
     /**
      * Add getter to avoid direct access to static header map.
+     *
      * @return
      */
     public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
@@ -90,6 +132,7 @@ public class MRClientFactory {
 
     /**
      * Add setter to avoid direct access to static header map.
+     *
      * @param headers
      */
     public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
@@ -100,15 +143,11 @@ public class MRClientFactory {
      * Create a consumer instance with the default timeout and no limit on
      * messages returned. This consumer operates as an independent consumer
      * (i.e., not in a group) and is NOT re-startable across sessions.
-     * 
-     * @param hostList
-     *            A comma separated list of hosts to use to connect to MR. You
-     *            can include port numbers (3904 is the default). For example,
-     *            "hostname:8080,"
-     * 
-     * @param topic
-     *            The topic to consume
-     * 
+     *
+     * @param hostList A comma separated list of hosts to use to connect to MR. You
+     *                 can include port numbers (3904 is the default). For example,
+     *                 "hostname:8080,"
+     * @param topic    The topic to consume
      * @return a consumer
      */
     public static MRConsumer createConsumer(String hostList, String topic) {
@@ -119,12 +158,9 @@ public class MRClientFactory {
      * Create a consumer instance with the default timeout and no limit on
      * messages returned. This consumer operates as an independent consumer
      * (i.e., not in a group) and is NOT re-startable across sessions.
-     * 
-     * @param hostSet
-     *            The host used in the URL to MR. Entries can be "host:port".
-     * @param topic
-     *            The topic to consume
-     * 
+     *
+     * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+     * @param topic   The topic to consume
      * @return a consumer
      */
     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
@@ -136,14 +172,10 @@ public class MRClientFactory {
      * timeout, and no limit on messages returned. This consumer operates as an
      * independent consumer (i.e., not in a group) and is NOT re-startable
      * across sessions.
-     * 
-     * @param hostSet
-     *            The host used in the URL to MR. Entries can be "host:port".
-     * @param topic
-     *            The topic to consume
-     * @param filter
-     *            a filter to use on the server side
-     * 
+     *
+     * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+     * @param topic   The topic to consume
+     * @param filter  a filter to use on the server side
      * @return a consumer
      */
     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
@@ -155,20 +187,15 @@ public class MRClientFactory {
      * messages returned. This consumer can operate in a logical group and is
      * re-startable across sessions when you use the same group and ID on
      * restart.
-     * 
-     * @param hostSet
-     *            The host used in the URL to MR. Entries can be "host:port".
-     * @param topic
-     *            The topic to consume
-     * @param consumerGroup
-     *            The name of the consumer group this consumer is part of
-     * @param consumerId
-     *            The unique id of this consume in its group
-     * 
+     *
+     * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
+     * @param topic         The topic to consume
+     * @param consumerGroup The name of the consumer group this consumer is part of
+     * @param consumerId    The unique id of this consume in its group
      * @return a consumer
      */
     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
-            final String consumerId) {
+                                            final String consumerId) {
         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
     }
 
@@ -177,27 +204,20 @@ public class MRClientFactory {
      * messages returned. This consumer can operate in a logical group and is
      * re-startable across sessions when you use the same group and ID on
      * restart.
-     * 
-     * @param hostSet
-     *            The host used in the URL to MR. Entries can be "host:port".
-     * @param topic
-     *            The topic to consume
-     * @param consumerGroup
-     *            The name of the consumer group this consumer is part of
-     * @param consumerId
-     *            The unique id of this consume in its group
-     * @param timeoutMs
-     *            The amount of time in milliseconds that the server should keep
-     *            the connection open while waiting for message traffic. Use -1
-     *            for default timeout.
-     * @param limit
-     *            A limit on the number of messages returned in a single call.
-     *            Use -1 for no limit.
-     * 
+     *
+     * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
+     * @param topic         The topic to consume
+     * @param consumerGroup The name of the consumer group this consumer is part of
+     * @param consumerId    The unique id of this consume in its group
+     * @param timeoutMs     The amount of time in milliseconds that the server should keep
+     *                      the connection open while waiting for message traffic. Use -1
+     *                      for default timeout.
+     * @param limit         A limit on the number of messages returned in a single call.
+     *                      Use -1 for no limit.
      * @return a consumer
      */
     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
-            final String consumerId, int timeoutMs, int limit) {
+                                            final String consumerId, int timeoutMs, int limit) {
         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
     }
 
@@ -206,31 +226,23 @@ public class MRClientFactory {
      * messages returned. This consumer can operate in a logical group and is
      * re-startable across sessions when you use the same group and ID on
      * restart. This consumer also uses server-side filtering.
-     * 
-     * @param hostList
-     *            A comma separated list of hosts to use to connect to MR. You
-     *            can include port numbers (3904 is the default)"
-     * @param topic
-     *            The topic to consume
-     * @param consumerGroup
-     *            The name of the consumer group this consumer is part of
-     * @param consumerId
-     *            The unique id of this consume in its group
-     * @param timeoutMs
-     *            The amount of time in milliseconds that the server should keep
-     *            the connection open while waiting for message traffic. Use -1
-     *            for default timeout.
-     * @param limit
-     *            A limit on the number of messages returned in a single call.
-     *            Use -1 for no limit.
-     * @param filter
-     *            A Highland Park filter expression using only built-in filter
-     *            components. Use null for "no filter".
-     * 
+     *
+     * @param hostList      A comma separated list of hosts to use to connect to MR. You
+     *                      can include port numbers (3904 is the default)"
+     * @param topic         The topic to consume
+     * @param consumerGroup The name of the consumer group this consumer is part of
+     * @param consumerId    The unique id of this consume in its group
+     * @param timeoutMs     The amount of time in milliseconds that the server should keep
+     *                      the connection open while waiting for message traffic. Use -1
+     *                      for default timeout.
+     * @param limit         A limit on the number of messages returned in a single call.
+     *                      Use -1 for no limit.
+     * @param filter        A Highland Park filter expression using only built-in filter
+     *                      components. Use null for "no filter".
      * @return a consumer
      */
     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
-            final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+                                            final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
                 filter, apiKey, apiSecret);
     }
@@ -240,32 +252,25 @@ public class MRClientFactory {
      * messages returned. This consumer can operate in a logical group and is
      * re-startable across sessions when you use the same group and ID on
      * restart. This consumer also uses server-side filtering.
-     * 
-     * @param hostSet
-     *            The host used in the URL to MR. Entries can be "host:port".
-     * @param topic
-     *            The topic to consume
-     * @param consumerGroup
-     *            The name of the consumer group this consumer is part of
-     * @param consumerId
-     *            The unique id of this consume in its group
-     * @param timeoutMs
-     *            The amount of time in milliseconds that the server should keep
-     *            the connection open while waiting for message traffic. Use -1
-     *            for default timeout.
-     * @param limit
-     *            A limit on the number of messages returned in a single call.
-     *            Use -1 for no limit.
-     * @param filter
-     *            A Highland Park filter expression using only built-in filter
-     *            components. Use null for "no filter".
-     * 
+     *
+     * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
+     * @param topic         The topic to consume
+     * @param consumerGroup The name of the consumer group this consumer is part of
+     * @param consumerId    The unique id of this consume in its group
+     * @param timeoutMs     The amount of time in milliseconds that the server should keep
+     *                      the connection open while waiting for message traffic. Use -1
+     *                      for default timeout.
+     * @param limit         A limit on the number of messages returned in a single call.
+     *                      Use -1 for no limit.
+     * @param filter        A Highland Park filter expression using only built-in filter
+     *                      components. Use null for "no filter".
      * @return a consumer
      */
     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
-            final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
-        if (MRClientBuilders.sfConsumerMock != null)
+                                            final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+        if (MRClientBuilders.sfConsumerMock != null) {
             return MRClientBuilders.sfConsumerMock;
+        }
         try {
             return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
                     .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
@@ -277,21 +282,19 @@ public class MRClientFactory {
         }
     }
 
-    /*************************************************************************/
-    /*************************************************************************/
-    /*************************************************************************/
+    //*************************************************************************
+    //*************************************************************************
+    //*************************************************************************
 
     /**
      * Create a publisher that sends each message (or group of messages)
      * immediately. Most applications should favor higher latency for much
      * higher message throughput and the "simple publisher" is not a good
      * choice.
-     * 
-     * @param hostlist
-     *            The host used in the URL to MR. Can be "host:port", can be
-     *            multiple comma-separated entries.
-     * @param topic
-     *            The topic on which to publish messages.
+     *
+     * @param hostlist The host used in the URL to MR. Can be "host:port", can be
+     *                 multiple comma-separated entries.
+     * @param topic    The topic on which to publish messages.
      * @return a publisher
      */
     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
@@ -302,94 +305,69 @@ public class MRClientFactory {
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown. Message payloads are
      * not compressed.
-     * 
-     * @param hostlist
-     *            The host used in the URL to MR. Can be "host:port", can be
-     *            multiple comma-separated entries.
-     * @param topic
-     *            The topic on which to publish messages.
-     * @param maxBatchSize
-     *            The largest set of messages to batch
-     * @param maxAgeMs
-     *            The maximum age of a message waiting in a batch
-     * 
+     *
+     * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
+     *                     multiple comma-separated entries.
+     * @param topic        The topic on which to publish messages.
+     * @param maxBatchSize The largest set of messages to batch
+     * @param maxAgeMs     The maximum age of a message waiting in a batch
      * @return a publisher
      */
     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
-            long maxAgeMs) {
+                                                              long maxAgeMs) {
         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
     }
 
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown.
-     * 
-     * @param hostlist
-     *            The host used in the URL to MR. Can be "host:port", can be
-     *            multiple comma-separated entries.
-     * @param topic
-     *            The topic on which to publish messages.
-     * @param maxBatchSize
-     *            The largest set of messages to batch
-     * @param maxAgeMs
-     *            The maximum age of a message waiting in a batch
-     * @param compress
-     *            use gzip compression
-     * 
+     *
+     * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
+     *                     multiple comma-separated entries.
+     * @param topic        The topic on which to publish messages.
+     * @param maxBatchSize The largest set of messages to batch
+     * @param maxAgeMs     The maximum age of a message waiting in a batch
+     * @param compress     use gzip compression
      * @return a publisher
      */
     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
-            long maxAgeMs, boolean compress) {
+                                                              long maxAgeMs, boolean compress) {
         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
     }
 
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown.
-     * 
-     * @param hostSet
-     *            A set of hosts to be used in the URL to MR. Can be
-     *            "host:port". Use multiple entries to enable failover.
-     * @param topic
-     *            The topic on which to publish messages.
-     * @param maxBatchSize
-     *            The largest set of messages to batch
-     * @param maxAgeMs
-     *            The maximum age of a message waiting in a batch
-     * @param compress
-     *            use gzip compression
-     * 
+     *
+     * @param hostSet      A set of hosts to be used in the URL to MR. Can be
+     *                     "host:port". Use multiple entries to enable failover.
+     * @param topic        The topic on which to publish messages.
+     * @param maxBatchSize The largest set of messages to batch
+     * @param maxAgeMs     The maximum age of a message waiting in a batch
+     * @param compress     use gzip compression
      * @return a publisher
      */
     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
-            long maxAgeMs, boolean compress) {
+                                                              long maxAgeMs, boolean compress) {
         final TreeSet<String> hosts = new TreeSet<>();
-        for (String hp : hostSet) {
-            hosts.add(hp);
-        }
+        Collections.addAll(hosts, hostSet);
         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
     }
 
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown.
-     * 
-     * @param hostSet
-     *            A set of hosts to be used in the URL to MR. Can be
-     *            "host:port". Use multiple entries to enable failover.
-     * @param topic
-     *            The topic on which to publish messages.
-     * @param maxBatchSize
-     *            The largest set of messages to batch
-     * @param maxAgeMs
-     *            The maximum age of a message waiting in a batch
-     * @param compress
-     *            use gzip compression
-     * 
+     *
+     * @param hostSet      A set of hosts to be used in the URL to MR. Can be
+     *                     "host:port". Use multiple entries to enable failover.
+     * @param topic        The topic on which to publish messages.
+     * @param maxBatchSize The largest set of messages to batch
+     * @param maxAgeMs     The maximum age of a message waiting in a batch
+     * @param compress     use gzip compression
      * @return a publisher
      */
     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
-            int maxBatchSize, long maxAgeMs, boolean compress) {
+                                                              int maxBatchSize, long maxAgeMs, boolean compress) {
         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
                 .compress(compress).build();
     }
@@ -397,28 +375,20 @@ public class MRClientFactory {
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown.
-     * 
-     * @param host
-     *            A host to be used in the URL to MR. Can be "host:port". Use
-     *            multiple entries to enable failover.
-     * @param topic
-     *            The topic on which to publish messages.
-     * @param username
-     *            username
-     * @param password
-     *            password
-     * @param maxBatchSize
-     *            The largest set of messages to batch
-     * @param maxAgeMs
-     *            The maximum age of a message waiting in a batch
-     * @param compress
-     *            use gzip compression
-     * @param protocolFlag
-     *            http auth or ueb auth or dme2 method
+     *
+     * @param host         A host to be used in the URL to MR. Can be "host:port". Use
+     *                     multiple entries to enable failover.
+     * @param topic        The topic on which to publish messages.
+     * @param username     username
+     * @param password     password
+     * @param maxBatchSize The largest set of messages to batch
+     * @param maxAgeMs     The maximum age of a message waiting in a batch
+     * @param compress     use gzip compression
+     * @param protocolFlag http auth or ueb auth or dme2 method
      * @return MRBatchingPublisher obj
      */
     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
-            final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
+                                                              final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
                 .compress(compress).build();
@@ -433,14 +403,11 @@ public class MRClientFactory {
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown
-     * 
-     * @param props
-     *            props set all properties for publishing message
+     *
+     * @param props props set all properties for publishing message
      * @return MRBatchingPublisher obj
-     * @throws FileNotFoundException
-     *             exc
-     * @throws IOException
-     *             ioex
+     * @throws FileNotFoundException exc
+     * @throws IOException           ioex
      */
     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
             throws FileNotFoundException, IOException {
@@ -450,14 +417,11 @@ public class MRClientFactory {
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown
-     * 
-     * @param props
-     *            props set all properties for publishing message
+     *
+     * @param props props set all properties for publishing message
      * @return MRBatchingPublisher obj
-     * @throws FileNotFoundException
-     *             exc
-     * @throws IOException
-     *             ioex
+     * @throws FileNotFoundException exc
+     * @throws IOException           ioex
      */
     public static MRBatchingPublisher createBatchingPublisher(Properties props)
             throws FileNotFoundException, IOException {
@@ -467,19 +431,16 @@ public class MRClientFactory {
     /**
      * Create a publisher that batches messages. Be sure to close the publisher
      * to send the last batch and ensure a clean shutdown
-     * 
-     * @param producerFilePath
-     *            set all properties for publishing message
+     *
+     * @param producerFilePath set all properties for publishing message
      * @return MRBatchingPublisher obj
-     * @throws FileNotFoundException
-     *             exc
-     * @throws IOException
-     *             ioex
+     * @throws FileNotFoundException exc
+     * @throws IOException           ioex
      */
     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
             throws FileNotFoundException, IOException {
         Properties props = new Properties();
-        try(InputStream input = new FileInputStream(producerFilePath)) {
+        try (InputStream input = new FileInputStream(producerFilePath)) {
             props.load(input);
         }
         return createBatchingPublisher(props);
@@ -488,19 +449,16 @@ public class MRClientFactory {
     /**
      * Create a publisher that will contain send methods that return response
      * object to user.
-     * 
-     * @param producerFilePath
-     *            set all properties for publishing message
+     *
+     * @param producerFilePath set all properties for publishing message
      * @return MRBatchingPublisher obj
-     * @throws FileNotFoundException
-     *             exc
-     * @throws IOException
-     *             ioex
+     * @throws FileNotFoundException exc
+     * @throws IOException           ioex
      */
     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
             throws FileNotFoundException, IOException {
         Properties props = new Properties();
-        try(InputStream input = new FileInputStream(producerFilePath)) {
+        try (InputStream input = new FileInputStream(producerFilePath)) {
             props.load(input);
         }
         return createBatchingPublisher(props, withResponse);
@@ -510,26 +468,32 @@ public class MRClientFactory {
             throws FileNotFoundException, IOException {
         assert props != null;
         MRSimplerBatchPublisher pub;
+
+        String messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE);
+        if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
+            messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
+        }
+
         if (withResponse) {
             pub = new MRSimplerBatchPublisher.Builder()
-                    .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+                    .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
                     .onTopic(props.getProperty(TOPIC))
-                    .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
-                            Integer.parseInt(props.getProperty("maxAgeMs").toString()))
-                    .compress(Boolean.parseBoolean(props.getProperty("compress")))
-                    .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+                    .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
+                            Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
+                    .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
+                    .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
                     .withResponse(withResponse).build();
         } else {
             pub = new MRSimplerBatchPublisher.Builder()
-                    .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
+                    .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
                     .onTopic(props.getProperty(TOPIC))
-                    .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
-                            Integer.parseInt(props.getProperty("maxAgeMs").toString()))
-                    .compress(Boolean.parseBoolean(props.getProperty("compress")))
-                    .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+                    .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
+                            Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
+                    .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
+                    .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
         }
         pub.setHost(props.getProperty(HOST));
-        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
 
             pub.setAuthKey(props.getProperty(AUTH_KEY));
             pub.setAuthDate(props.getProperty(AUTH_DATE));
@@ -542,7 +506,7 @@ public class MRClientFactory {
         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
         pub.setProps(props);
         prop = new Properties();
-        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
             routeReader = new FileReader(new File(routeFilePath));
             File fo = new File(routeFilePath);
@@ -555,14 +519,11 @@ public class MRClientFactory {
 
     /**
      * Create an identity manager client to work with API keys.
-     * 
-     * @param hostSet
-     *            A set of hosts to be used in the URL to MR. Can be
-     *            "host:port". Use multiple entries to enable failover.
-     * @param apiKey
-     *            Your API key
-     * @param apiSecret
-     *            Your API secret
+     *
+     * @param hostSet   A set of hosts to be used in the URL to MR. Can be
+     *                  "host:port". Use multiple entries to enable failover.
+     * @param apiKey    Your API key
+     * @param apiSecret Your API secret
      * @return an identity manager
      */
     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
@@ -578,14 +539,11 @@ public class MRClientFactory {
 
     /**
      * Create a topic manager for working with topics.
-     * 
-     * @param hostSet
-     *            A set of hosts to be used in the URL to MR. Can be
-     *            "host:port". Use multiple entries to enable failover.
-     * @param apiKey
-     *            Your API key
-     * @param apiSecret
-     *            Your API secret
+     *
+     * @param hostSet   A set of hosts to be used in the URL to MR. Can be
+     *                  "host:port". Use multiple entries to enable failover.
+     * @param apiKey    Your API key
+     * @param apiSecret Your API secret
      * @return a topic manager
      */
     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
@@ -601,7 +559,7 @@ public class MRClientFactory {
 
     /**
      * Inject a consumer. Used to support unit tests.
-     * 
+     *
      * @param cc
      */
     public static void $testInject(MRConsumer cc) {
@@ -609,13 +567,13 @@ public class MRClientFactory {
     }
 
     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
-            String id, int i, int j, String protocalFlag, String consumerFilePath) {
+                                            String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
 
         MRConsumerImpl sub;
         try {
             sub = new MRConsumerImpl.MRConsumerImplBuilder()
                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
-                    .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+                    .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
                     .createMRConsumerImpl();
         } catch (MalformedURLException e) {
@@ -631,13 +589,13 @@ public class MRClientFactory {
     }
 
     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
-            String id, String protocalFlag, String consumerFilePath, int i, int j) {
+                                            String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
 
         MRConsumerImpl sub;
         try {
             sub = new MRConsumerImpl.MRConsumerImplBuilder()
                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
-                    .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
+                    .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
                     .createMRConsumerImpl();
         } catch (MalformedURLException e) {
@@ -654,7 +612,7 @@ public class MRClientFactory {
 
     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
         Properties props = new Properties();
-        try(InputStream input = new FileInputStream(consumerFilePath)) {
+        try (InputStream input = new FileInputStream(consumerFilePath)) {
             props.load(input);
         }
         return createConsumer(props);
@@ -663,26 +621,29 @@ public class MRClientFactory {
     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
         int timeout;
         ValidatorUtil.validateSubscriber(props);
-        if (props.getProperty("timeout") != null)
-            timeout = Integer.parseInt(props.getProperty("timeout"));
-        else
+        if (props.getProperty(TIMEOUT) != null) {
+            timeout = Integer.parseInt(props.getProperty(TIMEOUT));
+        } else {
             timeout = -1;
+        }
         int limit;
-        if (props.getProperty("limit") != null)
-            limit = Integer.parseInt(props.getProperty("limit"));
-        else
+        if (props.getProperty(LIMIT) != null) {
+            limit = Integer.parseInt(props.getProperty(LIMIT));
+        } else {
             limit = -1;
+        }
         String group;
-        if (props.getProperty("group") == null)
+        if (props.getProperty(GROUP) == null) {
             group = UUID.randomUUID().toString();
-        else
-            group = props.getProperty("group");
+        } else {
+            group = props.getProperty(GROUP);
+        }
         MRConsumerImpl sub = null;
-        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
             sub = new MRConsumerImpl.MRConsumerImplBuilder()
                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
-                    .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+                    .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
                     .setFilter(props.getProperty(FILTER))
                     .setApiKey_username(props.getProperty(AUTH_KEY))
                     .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
@@ -694,29 +655,29 @@ public class MRClientFactory {
             sub = new MRConsumerImpl.MRConsumerImplBuilder()
                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
-                    .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
+                    .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
                     .setFilter(props.getProperty(FILTER))
                     .setApiKey_username(props.getProperty(USERNAME))
                     .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
             sub.setUsername(props.getProperty(USERNAME));
             sub.setPassword(props.getProperty(PASSWORD));
         }
-        
+
         sub.setProps(props);
         sub.setHost(props.getProperty(HOST));
         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
         sub.setfFilter(props.getProperty(FILTER));
-        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+        if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
             routeReader = new FileReader(new File(routeFilePath));
             prop = new Properties();
             File fo = new File(routeFilePath);
-                if (!fo.exists()) {
-                    routeWriter = new FileWriter(new File(routeFilePath));
-                }
+            if (!fo.exists()) {
+                routeWriter = new FileWriter(new File(routeFilePath));
+            }
         }
-        
+
         return sub;
     }
 }