Fix various Sonar issues in MRConsumerImpl 99/56099/3
authorLee, Tian (tl5884) <TianL@amdocs.com>
Tue, 10 Jul 2018 13:15:52 +0000 (14:15 +0100)
committerLee, Tian (tl5884) <TianL@amdocs.com>
Tue, 10 Jul 2018 13:48:25 +0000 (14:48 +0100)
Amendment 1: Restructure order of methods and fields according to Java convention.
Amendment 2: Remove bug fix that was accidentally included. This will be committed in a separate commit.

These changes do not affect the original functionality of the class.

Change-Id: I974a5e3fa94ca184704c4e26636bcbbfb5ce3320
Issue-ID: DMAAP-540
Signed-off-by: Lee, Tian (tl5884) <TianL@amdocs.com>
src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java

index faa81ce..4f5907f 100644 (file)
@@ -8,7 +8,7 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+ * 
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  ============LICENSE_END=========================================================
  *
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
+ * 
  *******************************************************************************/
 package com.att.nsa.mr.client.impl;
 
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+import com.att.nsa.mr.client.HostSelector;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
@@ -34,7 +41,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.http.HttpException;
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
@@ -44,738 +50,618 @@ import org.json.JSONTokener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.att.aft.dme2.api.DME2Client;
-import com.att.aft.dme2.api.DME2Exception;
-import com.att.nsa.mr.client.HostSelector;
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-
 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
 
-       private static final String SUCCESS_MESSAGE = "Success";
-
-       private Logger log = LoggerFactory.getLogger(this.getClass().getName());
-
-       public static List<String> stringToList(String str) {
-               final LinkedList<String> set = new LinkedList<String>();
-               if (str != null) {
-                       final String[] parts = str.trim().split(",");
-                       for (String part : parts) {
-                               final String trimmed = part.trim();
-                               if (trimmed.length() > 0) {
-                                       set.add(trimmed);
-                               }
-                       }
-               }
-               return set;
-       }
-
-       public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
-                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
-                       String apiSecret_password) throws MalformedURLException {
-               this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
-                               false);
-       }
-
-       public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
-                       final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
-                       boolean allowSelfSignedCerts) throws MalformedURLException {
-               super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
-
-               fTopic = topic;
-               fGroup = consumerGroup;
-               fId = consumerId;
-               fTimeoutMs = timeoutMs;
-               fLimit = limit;
-               fFilter = filter;
-
-               fHostSelector = new HostSelector(hostPart);
-       }
-
-       @Override
-       public Iterable<String> fetch() throws IOException, Exception {
-               // fetch with the timeout and limit set in constructor
-               return fetch(fTimeoutMs, fLimit);
-       }
-
-       @Override
-       public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
-               final LinkedList<String> msgs = new LinkedList<String>();
-
-               // FIXME: the timeout on the socket needs to be at least as long as the
-               // long poll
-               // // sanity check for long poll timeout vs. socket read timeout
-               // final int maxReasonableTimeoutMs =
-               // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
-               // if ( timeoutMs > maxReasonableTimeoutMs )
-               // {
-               // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
-               // socket read timeout (" +
-               // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
-               // timeout to " + maxReasonableTimeoutMs + "." );
-               // timeoutMs = maxReasonableTimeoutMs;
-               // }
-
-               // final String urlPath = createUrlPath ( timeoutMs, limit );
-
-               // getLog().info ( "UEB GET " + urlPath );
-               try {
-                       if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               DMEConfigure(timeoutMs, limit);
-                               try {
-                                       // getLog().info ( "Receiving msgs from: " +
-                                       // url+subContextPath );
-                                       long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs)  ? dme2ReplyHandlerTimeoutMs :  (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); 
-                                       //String reply = sender.sendAndWait(timeout);   
-                                       String reply = sender.sendAndWait(timeoutMs + 10000L);
-                                       final JSONObject o = getResponseDataInJson(reply);
-                                       // msgs.add(reply);
-                                       if (o != null) {
-                                               final JSONArray a = o.getJSONArray("result");
-                                               // final int b = o.getInt("status" );
-                                               // if ( a != null && a.length()>0 )
-                                               if (a != null) {
-                                                       for (int i = 0; i < a.length(); i++) {
-                                                               // msgs.add("DMAAP response status:
-                                                               // "+Integer.toString(b));
-                                                               if (a.get(i) instanceof String)
-                                                                       msgs.add(a.getString(i));
-                                                               else
-                                                                       msgs.add(a.getJSONObject(i).toString());
-
-                                                       }
-                                               }
-                                               // else if(a != null && a.length()<1){
-                                               // msgs.add ("[]");
-                                               // }
-                                       }
-                               } catch (JSONException e) {
-                                       // unexpected response
-                                       reportProblemWithResponse();
-                                       log.error("exception: ", e);
-                               } catch (HttpException e) {
-                                       throw new IOException(e);
-                               }
-                       }
-
-                       if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               // final String urlPath = createUrlPath
-                               // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
-                               // fId,props.getProperty("Protocol")), timeoutMs, limit );
-                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
-                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
-                               try {
-                                       final JSONObject o = get(urlPath, username, password, protocolFlag);
-
-                                       if (o != null) {
-                                               final JSONArray a = o.getJSONArray("result");
-                                               final int b = o.getInt("status");
-                                               // if ( a != null && a.length()>0 )
-                                               if (a != null) {
-                                                       for (int i = 0; i < a.length(); i++) {
-                                                               // msgs.add("DMAAP response status:
-                                                               // "+Integer.toString(b));
-                                                               if (a.get(i) instanceof String)
-                                                                       msgs.add(a.getString(i));
-                                                               else
-                                                                       msgs.add(a.getJSONObject(i).toString());
-
-                                                       }
-                                               }
-                                               // else if(a != null && a.length()<1)
-                                               // {
-                                               // msgs.add ("[]");
-                                               // }
-                                       }
-                               } catch (JSONException e) {
-                                       // unexpected response
-                                       reportProblemWithResponse();
-                                       log.error("exception: ", e);
-                               } catch (HttpException e) {
-                                       throw new IOException(e);
-                               }
-                       }
-
-                       if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               final String urlPath = createUrlPath(
-                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
-                                               timeoutMs, limit);
-
-                               try {
-                                       final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
-                                       if (o != null) {
-                                               final JSONArray a = o.getJSONArray("result");
-                                               final int b = o.getInt("status");
-                                               // if ( a != null && a.length()>0)
-                                               if (a != null) {
-                                                       for (int i = 0; i < a.length(); i++) {
-                                                               // msgs.add("DMAAP response status:
-                                                               // "+Integer.toString(b));
-                                                               if (a.get(i) instanceof String)
-                                                                       msgs.add(a.getString(i));
-                                                               else
-                                                                       msgs.add(a.getJSONObject(i).toString());
-
-                                                       }
-                                               }
-                                               // else if(a != null && a.length()<1){
-                                               // msgs.add ("[]");
-                                               // }
-                                       }
-                               } catch (JSONException e) {
-                                       // unexpected response
-                                       reportProblemWithResponse();
-                                       log.error("exception: ", e);
-                               } catch (HttpException e) {
-                                       throw new IOException(e);
-                               }
-
-                       }
-                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
-                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
-                               try {
-                                       final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
-                                       if (o != null) {
-                                               final JSONArray a = o.getJSONArray("result");
-                                               final int b = o.getInt("status");
-                                               // if ( a != null && a.length()>0)
-                                               if (a != null) {
-                                                       for (int i = 0; i < a.length(); i++) {
-                                                               // msgs.add("DMAAP response status:
-                                                               // "+Integer.toString(b));
-                                                               if (a.get(i) instanceof String)
-                                                                       msgs.add(a.getString(i));
-                                                               else
-                                                                       msgs.add(a.getJSONObject(i).toString());
-
-                                                       }
-                                               }
-
-                                       }
-                               } catch (JSONException e) {
-                                       // unexpected response
-                                       reportProblemWithResponse();
-                                       log.error("exception: ", e);
-                               } catch (HttpException e) {
-                                       throw new IOException(e);
-                               }
-
-                       }
-
-               } catch (JSONException e) {
-                       // unexpected response
-                       reportProblemWithResponse();
-                       log.error("exception: ", e);
-               } catch (HttpException e) {
-                       throw new IOException(e);
-               } catch (Exception e) {
-                       throw e;
-               }
-
-               return msgs;
-       }
-
-       private JSONObject getResponseDataInJson(String response) {
-               try {
-
-                       // log.info("DMAAP response status: " + response.getStatus());
-
-                       // final String responseData = response.readEntity(String.class);
-                       JSONTokener jsonTokener = new JSONTokener(response);
-                       JSONObject jsonObject = null;
-                       final char firstChar = jsonTokener.next();
-                       jsonTokener.back();
-                       if ('[' == firstChar) {
-                               JSONArray jsonArray = new JSONArray(jsonTokener);
-                               jsonObject = new JSONObject();
-                               jsonObject.put("result", jsonArray);
-                       } else {
-                               jsonObject = new JSONObject(jsonTokener);
-                       }
-
-                       return jsonObject;
-               } catch (JSONException excp) {
-                        log.error("DMAAP - Error reading response data.", excp);
-                       return null;
-               }
-
-       }
-
-       private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
-               JSONTokener jsonTokener = new JSONTokener(response);
-               JSONObject jsonObject = null;
-               final char firstChar = jsonTokener.next();
-               jsonTokener.back();
-               if (null != response && response.length() == 0) {
-                       return null;
-               }
-
-               if ('[' == firstChar) {
-                       JSONArray jsonArray = new JSONArray(jsonTokener);
-                       jsonObject = new JSONObject();
-                       jsonObject.put("result", jsonArray);
-               } else if ('{' == firstChar) {
-                       return null;
-               } else if ('<' == firstChar) {
-                       return null;
-               } else {
-                       jsonObject = new JSONObject(jsonTokener);
-               }
-
-               return jsonObject;
-
-       }
-
-       private final String fTopic;
-       private final String fGroup;
-       private final String fId;
-       private final int fTimeoutMs;
-       private final int fLimit;
-       private String fFilter;
-       private String username;
-       private String password;
-       private String host;
-       HostSelector fHostSelector = null;
-       private String latitude;
-       private String longitude;
-       private String version;
-       private String serviceName;
-       private String env;
-       private String partner;
-       private String routeOffer;
-       private String subContextPath;
-       private String protocol;
-       private String methodType;
-       private String url;
-       private String dmeuser;
-       private String dmepassword;
-       private String contenttype;
-       private DME2Client sender;
-       public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-       public String consumerFilePath;
-       private String authKey;
-       private String authDate;
-       private Properties props;
-       private HashMap<String, String> DMETimeOuts;
-       private String handlers;
-       public static final String routerFilePath = null;
-       private long dme2ReplyHandlerTimeoutMs;
-       private long longPollingMs;
-       private static final long defaultDme2PerEndPointTimeoutMs = 10000L;
-       private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L;
-       
-
-       public static String getRouterFilePath() {
-               return routerFilePath;
-       }
-
-       public static void setRouterFilePath(String routerFilePath) {
-               MRSimplerBatchPublisher.routerFilePath = routerFilePath;
-       }
-
-       public String getConsumerFilePath() {
-               return consumerFilePath;
-       }
-
-       public void setConsumerFilePath(String consumerFilePath) {
-               this.consumerFilePath = consumerFilePath;
-       }
-
-       public String getProtocolFlag() {
-               return protocolFlag;
-       }
-
-       public void setProtocolFlag(String protocolFlag) {
-               this.protocolFlag = protocolFlag;
-       }
-
-       private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
-               this.longPollingMs = timeoutMs;
-               latitude = props.getProperty("Latitude");
-               longitude = props.getProperty("Longitude");
-               version = props.getProperty("Version");
-               serviceName = props.getProperty("ServiceName");
-               env = props.getProperty("Environment");
-               partner = props.getProperty("Partner");
-               routeOffer = props.getProperty("routeOffer");
-
-               subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
-               // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
-               // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
-               // timeoutMs);
-
-               protocol = props.getProperty("Protocol");
-               methodType = props.getProperty("MethodType");
-               dmeuser = props.getProperty("username");
-               dmepassword = props.getProperty("password");
-               contenttype = props.getProperty("contenttype");
-               handlers = props.getProperty("sessionstickinessrequired");
-               // url =protocol+"://DME2SEARCH/"+
-               // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
-               // url = protocol +
-               // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
-
-               /**
-                * Changes to DME2Client url to use Partner for auto failover between
-                * data centers When Partner value is not provided use the routeOffer
-                * value for auto failover within a cluster
-                */
-
-               String preferredRouteKey = readRoute("preferredRouteKey");
-
-               if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
-                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
-                                       + "&routeoffer=" + preferredRouteKey;
-               } else if (partner != null && !partner.isEmpty()) {
-                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
-               } else if (routeOffer != null && !routeOffer.isEmpty()) {
-                       url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
-                                       + routeOffer;
-               }
-
-               // log.info("url :"+url);
-
-               if (timeoutMs != -1)
-                       url = url + "&timeout=" + timeoutMs;
-               if (limit != -1)
-                       url = url + "&limit=" + limit;
-
-               // Add filter to DME2 Url
-               if (fFilter != null && fFilter.length() > 0)
-                       url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
-
-               DMETimeOuts = new HashMap<String, String>();
-               DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
-               DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
-               DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
-               DMETimeOuts.put("Content-Type", contenttype);
-               System.setProperty("AFT_LATITUDE", latitude);
-               System.setProperty("AFT_LONGITUDE", longitude);
-               System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
-               // System.setProperty("DME2.DEBUG", "true");
-
-               // SSL changes
-               // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-               // "SSLv3,TLSv1,TLSv1.1");
-               System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
-               System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
-               System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-               // SSL changes
-
-               long dme2PerEndPointTimeoutMs;
-               try {
-                       dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
-                       //backward compatibility
-                       if ( dme2PerEndPointTimeoutMs <= 0) {
-                               dme2PerEndPointTimeoutMs =  timeoutMs + defaultDme2PerEndPointTimeoutMs;
-                       }
-               } catch (NumberFormatException nfe) {
-                       //backward compatibility
-                       dme2PerEndPointTimeoutMs =  timeoutMs + defaultDme2PerEndPointTimeoutMs;
-                       getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs);
-               }
-
-               try {
-                       dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
-               } catch (NumberFormatException nfe) {
-                       try {
-                               long dme2EpReadTimeoutMs =  Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
-                               long dme2EpConnTimeoutMs =  Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
-                               dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
-                               getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs);
-                       }catch (NumberFormatException e) {
-                               //backward compatibility
-                               dme2ReplyHandlerTimeoutMs =  timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
-                               getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
-                       }
-               }
-               //backward compatibility
-               if ( dme2ReplyHandlerTimeoutMs <= 0) {
-                       dme2ReplyHandlerTimeoutMs =  timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
-               }
-               sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs);
-               sender.setAllowAllHttpReturnCodes(true);
-               sender.setMethod(methodType);
-               sender.setSubContext(subContextPath);
-               if (dmeuser != null && dmepassword != null) {
-                       sender.setCredentials(dmeuser, dmepassword);
-               }
-               sender.setHeaders(DMETimeOuts);
-               sender.setPayload("");
-
-               if (handlers!=null&&handlers.equalsIgnoreCase("yes")) {
-                       sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
-                                       props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
-                       sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-               } else {
-                       sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-               }
-       }
-
-       public Properties getProps() {
-               return props;
-       }
-
-       public void setProps(Properties props) {
-               this.props = props;
-       }
-
-       protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
-               final StringBuffer contexturl = new StringBuffer(url);
-               // final StringBuffer url = new StringBuffer (
-               // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
-               final StringBuffer adds = new StringBuffer();
-               if (timeoutMs > -1)
-                       adds.append("timeout=").append(timeoutMs);
-               if (limit > -1) {
-                       if (adds.length() > 0) {
-                               adds.append("&");
-                       }
-                       adds.append("limit=").append(limit);
-               }
-               if (fFilter != null && fFilter.length() > 0) {
-                       try {
-                               if (adds.length() > 0) {
-                                       adds.append("&");
-                               }
-                               adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
-                       } catch (UnsupportedEncodingException e) {
-                               log.error("exception at createUrlPath ()  :  ", e);
-                       }
-               }
-               if (adds.length() > 0) {
-                       contexturl.append("?").append(adds.toString());
-               }
-
-               // sender.setSubContext(url.toString());
-               return contexturl.toString();
-       }
-
-       public String getUsername() {
-               return username;
-       }
-
-       public void setUsername(String username) {
-               this.username = username;
-       }
-
-       public String getPassword() {
-               return password;
-       }
-
-       public void setPassword(String password) {
-               this.password = password;
-       }
-
-       public String getHost() {
-               return host;
-       }
-
-       public void setHost(String host) {
-               this.host = host;
-       }
-
-       public String getAuthKey() {
-               return authKey;
-       }
-
-       public void setAuthKey(String authKey) {
-               this.authKey = authKey;
-       }
-
-       public String getAuthDate() {
-               return authDate;
-       }
-
-       public void setAuthDate(String authDate) {
-               this.authDate = authDate;
-       }
-
-       public String getfFilter() {
-               return fFilter;
-       }
-
-       public void setfFilter(String fFilter) {
-               this.fFilter = fFilter;
-       }
-
-       private String readRoute(String routeKey) {
-
-               try {
-
-                       MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
-
-               } catch (Exception ex) {
-                       log.error("Reply Router Error " + ex);
-               }
-               String routeOffer = MRClientFactory.prop.getProperty(routeKey);
-               return routeOffer;
-       }
-
-       @Override
-       public MRConsumerResponse fetchWithReturnConsumerResponse() {
-
-               // fetch with the timeout and limit set in constructor
-               return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
-       }
-
-       @Override
-       public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
-               final LinkedList<String> msgs = new LinkedList<String>();
-               MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
-               try {
-                       if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-                               DMEConfigure(timeoutMs, limit);
-
-                               long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs)  ? dme2ReplyHandlerTimeoutMs :  (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); 
-                               String reply = sender.sendAndWait(timeout);
-
-                               final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
-
-                               if (o != null) {
-                                       final JSONArray a = o.getJSONArray("result");
-
-                                       if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add(a.getString(i));
-                                                       else
-                                                               msgs.add(a.getJSONObject(i).toString());
-
-                                               }
-                                       }
-
-                               }
-                               createMRConsumerResponse(reply, mrConsumerResponse);
-                       }
-
-                       if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               /*
-                                * final String urlPath = createUrlPath(
-                                * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-                                * props.getProperty("Protocol")), timeoutMs, limit);
-                                */
-
-                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
-                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-                               String response = getResponse(urlPath, username, password, protocolFlag);
-
-                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
-
-                               if (o != null) {
-                                       final JSONArray a = o.getJSONArray("result");
-
-                                       if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add(a.getString(i));
-                                                       else
-                                                               msgs.add(a.getJSONObject(i).toString());
-
-                                               }
-                                       }
-
-                               }
-                               createMRConsumerResponse(response, mrConsumerResponse);
-                       }
-
-                       if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-                               final String urlPath = createUrlPath(
-                                               MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
-                                               timeoutMs, limit);
-
-                               String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
-                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
-                               if (o != null) {
-                                       final JSONArray a = o.getJSONArray("result");
-
-                                       if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add(a.getString(i));
-                                                       else
-                                                               msgs.add(a.getJSONObject(i).toString());
-
-                                               }
-                                       }
-
-                               }
-                               createMRConsumerResponse(response, mrConsumerResponse);
-                       }
-                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-                               // final String urlPath = createUrlPath(
-                               // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-                               // props.getProperty("Protocol")), timeoutMs,
-                               // limit);
-                               final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
-                                               fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
-
-                               String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
-                               final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
-                               if (o != null) {
-                                       final JSONArray a = o.getJSONArray("result");
-
-                                       if (a != null) {
-                                               for (int i = 0; i < a.length(); i++) {
-                                                       if (a.get(i) instanceof String)
-                                                               msgs.add(a.getString(i));
-                                                       else
-                                                               msgs.add(a.getJSONObject(i).toString());
-
-                                               }
-                                       }
-
-                               }
-                               createMRConsumerResponse(response, mrConsumerResponse);
-                       }
-
-               } catch (JSONException e) {
-                       mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
-                       mrConsumerResponse.setResponseMessage(e.getMessage());
-                       log.error("json exception: ", e);
-               } catch (HttpException e) {
-                       mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
-                       mrConsumerResponse.setResponseMessage(e.getMessage());
-                       log.error("http exception: ", e);
-               } catch (DME2Exception e) {
-                       mrConsumerResponse.setResponseCode(e.getErrorCode());
-                       mrConsumerResponse.setResponseMessage(e.getErrorMessage());
-                       log.error("DME2 exception: ", e);
-               } catch (Exception e) {
-                       mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
-                       mrConsumerResponse.setResponseMessage(e.getMessage());
-                       log.error("exception: ", e);
-               }
-               mrConsumerResponse.setActualMessages(msgs);
-               return mrConsumerResponse;
-       }
-
-       private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
-
-               if (reply.startsWith("{")) {
-                       JSONObject jObject = new JSONObject(reply);
-                       String message = jObject.getString("message");
-                       int status = jObject.getInt("status");
-
-                       mrConsumerResponse.setResponseCode(Integer.toString(status));
-
-                       if (null != message) {
-                               mrConsumerResponse.setResponseMessage(message);
-                       }
-               } else if (reply.startsWith("<")) {
-                       mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
-                       mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
-               } else {
-                       mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-                       mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
-               }
-
-       }
-
+    private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+    public static final String routerFilePath = null;
+
+    public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+    public String consumerFilePath;
+
+    private static final String SUCCESS_MESSAGE = "Success";
+    private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
+    private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
+
+    private final String fTopic;
+    private final String fGroup;
+    private final String fId;
+    private final int fTimeoutMs;
+    private final int fLimit;
+    private String fFilter;
+    private String username;
+    private String password;
+    private String host;
+    private HostSelector fHostSelector = null;
+    private String url;
+    private DME2Client sender;
+    private String authKey;
+    private String authDate;
+    private Properties props;
+    private HashMap<String, String> DMETimeOuts;
+    private long dme2ReplyHandlerTimeoutMs;
+    private long longPollingMs;
+
+    public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+            final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+            String apiSecret_password) throws MalformedURLException {
+        this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+                false);
+    }
+
+    public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+            final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+            boolean allowSelfSignedCerts) throws MalformedURLException {
+        super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
+
+        fTopic = topic;
+        fGroup = consumerGroup;
+        fId = consumerId;
+        fTimeoutMs = timeoutMs;
+        fLimit = limit;
+        fFilter = filter;
+
+        fHostSelector = new HostSelector(hostPart);
+    }
+
+    @Override
+    public Iterable<String> fetch() throws IOException, Exception {
+        // fetch with the timeout and limit set in constructor
+        return fetch(fTimeoutMs, fLimit);
+    }
+
+    @Override
+    public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
+        final LinkedList<String> msgs = new LinkedList<>();
+
+        try {
+            if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+                dmeConfigure(timeoutMs, limit);
+                try {
+                    String reply = sender.sendAndWait(timeoutMs + 10000L);
+                    final JSONObject o = getResponseDataInJson(reply);
+                    if (o != null) {
+                        final JSONArray a = o.getJSONArray("result");
+                        if (a != null) {
+                            for (int i = 0; i < a.length(); i++) {
+                                if (a.get(i) instanceof String)
+                                    msgs.add(a.getString(i));
+                                else
+                                    msgs.add(a.getJSONObject(i).toString());
+                            }
+                        }
+                    }
+                } catch (JSONException e) {
+                    // unexpected response
+                    reportProblemWithResponse();
+                    log.error("exception: ", e);
+                } catch (HttpException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                        fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                try {
+                    final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+                    if (o != null) {
+                        final JSONArray a = o.getJSONArray("result");
+                        if (a != null) {
+                            for (int i = 0; i < a.length(); i++) {
+                                if (a.get(i) instanceof String)
+                                    msgs.add(a.getString(i));
+                                else
+                                    msgs.add(a.getJSONObject(i).toString());
+                            }
+                        }
+                    }
+                } catch (JSONException e) {
+                    // unexpected response
+                    reportProblemWithResponse();
+                    log.error("exception: ", e);
+                } catch (HttpException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(
+                        MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+                        timeoutMs, limit);
+
+                try {
+                    final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+                    if (o != null) {
+                        final JSONArray a = o.getJSONArray("result");
+                        if (a != null) {
+                            for (int i = 0; i < a.length(); i++) {
+                                if (a.get(i) instanceof String)
+                                    msgs.add(a.getString(i));
+                                else
+                                    msgs.add(a.getJSONObject(i).toString());
+                            }
+                        }
+                    }
+                } catch (JSONException e) {
+                    // unexpected response
+                    reportProblemWithResponse();
+                    log.error("exception: ", e);
+                } catch (HttpException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                        fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                try {
+                    final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+                    if (o != null) {
+                        final JSONArray a = o.getJSONArray("result");
+                        if (a != null) {
+                            for (int i = 0; i < a.length(); i++) {
+                                if (a.get(i) instanceof String)
+                                    msgs.add(a.getString(i));
+                                else
+                                    msgs.add(a.getJSONObject(i).toString());
+                            }
+                        }
+                    }
+                } catch (JSONException e) {
+                    // unexpected response
+                    reportProblemWithResponse();
+                    log.error("exception: ", e);
+                } catch (HttpException e) {
+                    throw new IOException(e);
+                }
+            }
+        } catch (JSONException e) {
+            // unexpected response
+            reportProblemWithResponse();
+            log.error("exception: ", e);
+        } catch (HttpException e) {
+            throw new IOException(e);
+        } catch (Exception e) {
+            throw e;
+        }
+
+        return msgs;
+    }
+
+    @Override
+    public MRConsumerResponse fetchWithReturnConsumerResponse() {
+        // fetch with the timeout and limit set in constructor
+        return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
+    }
+
+    @Override
+    public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
+        final LinkedList<String> msgs = new LinkedList<String>();
+        MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
+        try {
+            if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+                dmeConfigure(timeoutMs, limit);
+
+                long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
+                        : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
+                String reply = sender.sendAndWait(timeout);
+
+                final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
+
+                if (o != null) {
+                    final JSONArray a = o.getJSONArray("result");
+
+                    if (a != null) {
+                        for (int i = 0; i < a.length(); i++) {
+                            if (a.get(i) instanceof String)
+                                msgs.add(a.getString(i));
+                            else
+                                msgs.add(a.getJSONObject(i).toString());
+                        }
+                    }
+                }
+                createMRConsumerResponse(reply, mrConsumerResponse);
+            }
+
+            if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                        fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                String response = getResponse(urlPath, username, password, protocolFlag);
+                final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+                if (o != null) {
+                    final JSONArray a = o.getJSONArray("result");
+
+                    if (a != null) {
+                        for (int i = 0; i < a.length(); i++) {
+                            if (a.get(i) instanceof String)
+                                msgs.add(a.getString(i));
+                            else
+                                msgs.add(a.getJSONObject(i).toString());
+                        }
+                    }
+                }
+                createMRConsumerResponse(response, mrConsumerResponse);
+            }
+
+            if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(
+                        MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+                        timeoutMs, limit);
+
+                String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+                final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+                if (o != null) {
+                    final JSONArray a = o.getJSONArray("result");
+
+                    if (a != null) {
+                        for (int i = 0; i < a.length(); i++) {
+                            if (a.get(i) instanceof String)
+                                msgs.add(a.getString(i));
+                            else
+                                msgs.add(a.getJSONObject(i).toString());
+                        }
+                    }
+                }
+                createMRConsumerResponse(response, mrConsumerResponse);
+            }
+
+            if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+                        fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+                String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+                final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+                if (o != null) {
+                    final JSONArray a = o.getJSONArray("result");
+
+                    if (a != null) {
+                        for (int i = 0; i < a.length(); i++) {
+                            if (a.get(i) instanceof String)
+                                msgs.add(a.getString(i));
+                            else
+                                msgs.add(a.getJSONObject(i).toString());
+                        }
+                    }
+                }
+                createMRConsumerResponse(response, mrConsumerResponse);
+            }
+
+        } catch (JSONException e) {
+            mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+            mrConsumerResponse.setResponseMessage(e.getMessage());
+            log.error("json exception: ", e);
+        } catch (HttpException e) {
+            mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+            mrConsumerResponse.setResponseMessage(e.getMessage());
+            log.error("http exception: ", e);
+        } catch (DME2Exception e) {
+            mrConsumerResponse.setResponseCode(e.getErrorCode());
+            mrConsumerResponse.setResponseMessage(e.getErrorMessage());
+            log.error("DME2 exception: ", e);
+        } catch (Exception e) {
+            mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+            mrConsumerResponse.setResponseMessage(e.getMessage());
+            log.error("exception: ", e);
+        }
+        mrConsumerResponse.setActualMessages(msgs);
+        return mrConsumerResponse;
+    }
+
+    private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
+        if (reply.startsWith("{")) {
+            JSONObject jObject = new JSONObject(reply);
+            String message = jObject.getString("message");
+            int status = jObject.getInt("status");
+
+            mrConsumerResponse.setResponseCode(Integer.toString(status));
+
+            if (null != message) {
+                mrConsumerResponse.setResponseMessage(message);
+            }
+        } else if (reply.startsWith("<")) {
+            mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
+            mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+        } else {
+            mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+            mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
+        }
+    }
+
+    private JSONObject getResponseDataInJson(String response) {
+        try {
+            JSONTokener jsonTokener = new JSONTokener(response);
+            JSONObject jsonObject = null;
+            final char firstChar = jsonTokener.next();
+            jsonTokener.back();
+            if ('[' == firstChar) {
+                JSONArray jsonArray = new JSONArray(jsonTokener);
+                jsonObject = new JSONObject();
+                jsonObject.put("result", jsonArray);
+            } else {
+                jsonObject = new JSONObject(jsonTokener);
+            }
+
+            return jsonObject;
+        } catch (JSONException excp) {
+            log.error("DMAAP - Error reading response data.", excp);
+            return null;
+        }
+    }
+
+    private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
+        JSONTokener jsonTokener = new JSONTokener(response);
+        JSONObject jsonObject = null;
+        final char firstChar = jsonTokener.next();
+        jsonTokener.back();
+        if (null != response && response.length() == 0) {
+            return null;
+        }
+
+        if ('[' == firstChar) {
+            JSONArray jsonArray = new JSONArray(jsonTokener);
+            jsonObject = new JSONObject();
+            jsonObject.put("result", jsonArray);
+        } else if ('{' == firstChar) {
+            return null;
+        } else if ('<' == firstChar) {
+            return null;
+        } else {
+            jsonObject = new JSONObject(jsonTokener);
+        }
+
+        return jsonObject;
+    }
+
+    private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+        this.longPollingMs = timeoutMs;
+        String latitude = props.getProperty("Latitude");
+        String longitude = props.getProperty("Longitude");
+        String version = props.getProperty("Version");
+        String serviceName = props.getProperty("ServiceName");
+        String env = props.getProperty("Environment");
+        String partner = props.getProperty("Partner");
+        String routeOffer = props.getProperty("routeOffer");
+        String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+        String protocol = props.getProperty("Protocol");
+        String methodType = props.getProperty("MethodType");
+        String dmeuser = props.getProperty("username");
+        String dmepassword = props.getProperty("password");
+        String contenttype = props.getProperty("contenttype");
+        String handlers = props.getProperty("sessionstickinessrequired");
+
+        /**
+         * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
+         * provided use the routeOffer value for auto failover within a cluster
+         */
+
+        String preferredRouteKey = readRoute("preferredRouteKey");
+
+        if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+            url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+                    + "&routeoffer=" + preferredRouteKey;
+        } else if (partner != null && !partner.isEmpty()) {
+            url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+        } else if (routeOffer != null && !routeOffer.isEmpty()) {
+            url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+                    + routeOffer;
+        }
+
+        if (timeoutMs != -1)
+            url = url + "&timeout=" + timeoutMs;
+        if (limit != -1)
+            url = url + "&limit=" + limit;
+
+        // Add filter to DME2 Url
+        if (fFilter != null && fFilter.length() > 0)
+            url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
+
+        DMETimeOuts = new HashMap<>();
+        DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+        DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+        DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+        DMETimeOuts.put("Content-Type", contenttype);
+        System.setProperty("AFT_LATITUDE", latitude);
+        System.setProperty("AFT_LONGITUDE", longitude);
+        System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+
+        // SSL changes
+        System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+        System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+        System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+        // SSL changes
+
+        long dme2PerEndPointTimeoutMs;
+        try {
+            dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+            // backward compatibility
+            if (dme2PerEndPointTimeoutMs <= 0) {
+                dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+            }
+        } catch (NumberFormatException nfe) {
+            // backward compatibility
+            dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
+            getLog().debug(
+                    "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
+        }
+
+        try {
+            dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+        } catch (NumberFormatException nfe) {
+            try {
+                long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+                long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+                dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
+                getLog().debug(
+                        "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
+                                + dme2ReplyHandlerTimeoutMs);
+            } catch (NumberFormatException e) {
+                // backward compatibility
+                dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+                getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
+            }
+        }
+        // backward compatibility
+        if (dme2ReplyHandlerTimeoutMs <= 0) {
+            dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
+        }
+
+        sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
+        sender.setAllowAllHttpReturnCodes(true);
+        sender.setMethod(methodType);
+        sender.setSubContext(subContextPath);
+        if (dmeuser != null && dmepassword != null) {
+            sender.setCredentials(dmeuser, dmepassword);
+        }
+        sender.setHeaders(DMETimeOuts);
+        sender.setPayload("");
+        if (handlers != null && handlers.equalsIgnoreCase("yes")) {
+            sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+                    props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+            sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+            sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+        } else {
+            sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+        }
+    }
+
+    protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+        final StringBuilder contexturl = new StringBuilder(url);
+        final StringBuilder adds = new StringBuilder();
+
+        if (timeoutMs > -1) {
+            adds.append("timeout=").append(timeoutMs);
+        }
+
+        if (limit > -1) {
+            if (adds.length() > 0) {
+                adds.append("&");
+            }
+            adds.append("limit=").append(limit);
+        }
+
+        if (fFilter != null && fFilter.length() > 0) {
+            try {
+                if (adds.length() > 0) {
+                    adds.append("&");
+                }
+                adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
+            } catch (UnsupportedEncodingException e) {
+                log.error("exception at createUrlPath ()  :  ", e);
+            }
+        }
+
+        if (adds.length() > 0) {
+            contexturl.append("?").append(adds.toString());
+        }
+
+        return contexturl.toString();
+    }
+
+    private String readRoute(String routeKey) {
+        try {
+            MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
+        } catch (Exception ex) {
+            log.error("Reply Router Error " + ex);
+        }
+        return MRClientFactory.prop.getProperty(routeKey);
+    }
+
+    public static List<String> stringToList(String str) {
+        final LinkedList<String> set = new LinkedList<>();
+        if (str != null) {
+            final String[] parts = str.trim().split(",");
+            for (String part : parts) {
+                final String trimmed = part.trim();
+                if (trimmed.length() > 0) {
+                    set.add(trimmed);
+                }
+            }
+        }
+        return set;
+    }
+
+    public static String getRouterFilePath() {
+        return routerFilePath;
+    }
+
+    public static void setRouterFilePath(String routerFilePath) {
+        MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+    }
+
+    public String getConsumerFilePath() {
+        return consumerFilePath;
+    }
+
+    public void setConsumerFilePath(String consumerFilePath) {
+        this.consumerFilePath = consumerFilePath;
+    }
+
+    public String getProtocolFlag() {
+        return protocolFlag;
+    }
+
+    public void setProtocolFlag(String protocolFlag) {
+        this.protocolFlag = protocolFlag;
+    }
+
+    public Properties getProps() {
+        return props;
+    }
+
+    public void setProps(Properties props) {
+        this.props = props;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public String getAuthKey() {
+        return authKey;
+    }
+
+    public void setAuthKey(String authKey) {
+        this.authKey = authKey;
+    }
+
+    public String getAuthDate() {
+        return authDate;
+    }
+
+    public void setAuthDate(String authDate) {
+        this.authDate = authDate;
+    }
+
+    public String getfFilter() {
+        return fFilter;
+    }
+
+    public void setfFilter(String fFilter) {
+        this.fFilter = fFilter;
+    }
 }