read and set the jersey client properties
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  * 
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import com.att.aft.dme2.api.DME2Client;
25 import com.att.aft.dme2.api.DME2Exception;
26 import org.onap.dmaap.mr.client.HostSelector;
27 import org.onap.dmaap.mr.client.MRClientFactory;
28 import org.onap.dmaap.mr.client.MRConsumer;
29 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
31 import java.io.File;
32 import java.io.FileReader;
33 import java.io.IOException;
34 import java.io.UnsupportedEncodingException;
35 import java.net.MalformedURLException;
36 import java.net.URI;
37 import java.net.URISyntaxException;
38 import java.net.URLEncoder;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.TimeUnit;
45 import org.apache.http.HttpException;
46 import org.apache.http.HttpStatus;
47 import org.json.JSONArray;
48 import org.json.JSONException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
55
56     private Logger log = LoggerFactory.getLogger(this.getClass().getName());
57
58     public static final String routerFilePath = null;
59
60     public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
61     public String consumerFilePath;
62
63     private static final String SUCCESS_MESSAGE = "Success";
64     private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
65     private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
66
67     private final String fTopic;
68     private final String fGroup;
69     private final String fId;
70     private final int fTimeoutMs;
71     private final int fLimit;
72     private String fFilter;
73     private String username;
74     private String password;
75     private String host;
76     private HostSelector fHostSelector = null;
77     private String url;
78     private DME2Client sender;
79     private String authKey;
80     private String authDate;
81     private Properties props;
82     private HashMap<String, String> DMETimeOuts;
83     private long dme2ReplyHandlerTimeoutMs;
84     private long longPollingMs;
85
86     public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
87         super(builder.hostPart,
88                 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
89
90         fTopic = builder.topic;
91         fGroup = builder.consumerGroup;
92         fId = builder.consumerId;
93         fTimeoutMs = builder.timeoutMs;
94         fLimit = builder.limit;
95         fFilter = builder.filter;
96
97         fHostSelector = new HostSelector(builder.hostPart);
98     }
99
100     public static class MRConsumerImplBuilder {
101         private Collection<String> hostPart;
102         private String topic;
103         private String consumerGroup;
104         private String consumerId;
105         private int timeoutMs;
106         private int limit;
107         private String filter;
108         private String apiKey_username;
109         private String apiSecret_password;
110         private String apiKey;
111         private String apiSecret;
112         private boolean allowSelfSignedCerts = false;
113
114         public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
115             this.hostPart = hostPart;
116             return this;
117         }
118
119         public MRConsumerImplBuilder setTopic(String topic) {
120             this.topic = topic;
121             return this;
122         }
123
124         public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
125             this.consumerGroup = consumerGroup;
126             return this;
127         }
128
129         public MRConsumerImplBuilder setConsumerId(String consumerId) {
130             this.consumerId = consumerId;
131             return this;
132         }
133
134         public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
135             this.timeoutMs = timeoutMs;
136             return this;
137         }
138
139         public MRConsumerImplBuilder setLimit(int limit) {
140             this.limit = limit;
141             return this;
142         }
143
144         public MRConsumerImplBuilder setFilter(String filter) {
145             this.filter = filter;
146             return this;
147         }
148
149         public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
150             this.apiKey_username = apiKey_username;
151             return this;
152         }
153
154         public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
155             this.apiSecret_password = apiSecret_password;
156             return this;
157         }
158
159         public MRConsumerImplBuilder setApiKey(String apiKey) {
160             this.apiKey = apiKey;
161             return this;
162         }
163
164         public MRConsumerImplBuilder setApiSecret(String apiSecret) {
165             this.apiSecret = apiSecret;
166             return this;
167         }
168
169         public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
170             this.allowSelfSignedCerts = allowSelfSignedCerts;
171             return this;
172         }
173
174         public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
175             return new MRConsumerImpl(this);
176         }
177     }
178
179     @Override
180     public Iterable<String> fetch() throws IOException, Exception {
181         // fetch with the timeout and limit set in constructor
182         return fetch(fTimeoutMs, fLimit);
183     }
184
185     @Override
186     public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
187         final LinkedList<String> msgs = new LinkedList<>();
188
189         try {
190             if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
191                 dmeConfigure(timeoutMs, limit);
192                 try {
193                     String reply = sender.sendAndWait(timeoutMs + 10000L);
194                     final JSONObject o = getResponseDataInJson(reply);
195                     if (o != null) {
196                         final JSONArray a = o.getJSONArray("result");
197                         if (a != null) {
198                             for (int i = 0; i < a.length(); i++) {
199                                 if (a.get(i) instanceof String)
200                                     msgs.add(a.getString(i));
201                                 else
202                                     msgs.add(a.getJSONObject(i).toString());
203                             }
204                         }
205                     }
206                 } catch (JSONException e) {
207                     // unexpected response
208                     reportProblemWithResponse();
209                     log.error("exception: ", e);
210                 } catch (HttpException e) {
211                     throw new IOException(e);
212                 }
213             }
214
215             if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
216                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
217                         fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
218
219                 try {
220                     final JSONObject o = get(urlPath, username, password, protocolFlag);
221
222                     if (o != null) {
223                         final JSONArray a = o.getJSONArray("result");
224                         if (a != null) {
225                             for (int i = 0; i < a.length(); i++) {
226                                 if (a.get(i) instanceof String)
227                                     msgs.add(a.getString(i));
228                                 else
229                                     msgs.add(a.getJSONObject(i).toString());
230                             }
231                         }
232                     }
233                 } catch (JSONException e) {
234                     // unexpected response
235                     reportProblemWithResponse();
236                     log.error("exception: ", e);
237                 } catch (HttpException e) {
238                     throw new IOException(e);
239                 }
240             }
241
242             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
243                 final String urlPath = createUrlPath(
244                         MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
245                         timeoutMs, limit);
246
247                 try {
248                     final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
249                     if (o != null) {
250                         final JSONArray a = o.getJSONArray("result");
251                         if (a != null) {
252                             for (int i = 0; i < a.length(); i++) {
253                                 if (a.get(i) instanceof String)
254                                     msgs.add(a.getString(i));
255                                 else
256                                     msgs.add(a.getJSONObject(i).toString());
257                             }
258                         }
259                     }
260                 } catch (JSONException e) {
261                     // unexpected response
262                     reportProblemWithResponse();
263                     log.error("exception: ", e);
264                 } catch (HttpException e) {
265                     throw new IOException(e);
266                 }
267             }
268
269             if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
270                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
271                         fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
272
273                 try {
274                     final JSONObject o = getNoAuth(urlPath);
275                     if (o != null) {
276                         final JSONArray a = o.getJSONArray("result");
277                         if (a != null) {
278                             for (int i = 0; i < a.length(); i++) {
279                                 if (a.get(i) instanceof String)
280                                     msgs.add(a.getString(i));
281                                 else
282                                     msgs.add(a.getJSONObject(i).toString());
283                             }
284                         }
285                     }
286                 } catch (JSONException e) {
287                     // unexpected response
288                     reportProblemWithResponse();
289                     log.error("exception: ", e);
290                 } catch (HttpException e) {
291                     throw new IOException(e);
292                 }
293             }
294         } catch (JSONException e) {
295             // unexpected response
296             reportProblemWithResponse();
297             log.error("exception: ", e);
298         } catch (HttpException e) {
299             throw new IOException(e);
300         } catch (Exception e) {
301             throw e;
302         }
303
304         return msgs;
305     }
306
307     @Override
308     public MRConsumerResponse fetchWithReturnConsumerResponse() {
309         // fetch with the timeout and limit set in constructor
310         return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
311     }
312
313     @Override
314     public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
315         final LinkedList<String> msgs = new LinkedList<>();
316         MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
317         try {
318             if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
319                 dmeConfigure(timeoutMs, limit);
320
321                 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
322                         : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
323                 String reply = sender.sendAndWait(timeout);
324
325                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
326
327                 if (o != null) {
328                     final JSONArray a = o.getJSONArray("result");
329
330                     if (a != null) {
331                         for (int i = 0; i < a.length(); i++) {
332                             if (a.get(i) instanceof String)
333                                 msgs.add(a.getString(i));
334                             else
335                                 msgs.add(a.getJSONObject(i).toString());
336                         }
337                     }
338                 }
339                 createMRConsumerResponse(reply, mrConsumerResponse);
340             }
341
342             if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
343                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
344                         fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
345
346                 String response = getResponse(urlPath, username, password, protocolFlag);
347                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
348                 if (o != null) {
349                     final JSONArray a = o.getJSONArray("result");
350
351                     if (a != null) {
352                         for (int i = 0; i < a.length(); i++) {
353                             if (a.get(i) instanceof String)
354                                 msgs.add(a.getString(i));
355                             else
356                                 msgs.add(a.getJSONObject(i).toString());
357                         }
358                     }
359                 }
360                 createMRConsumerResponse(response, mrConsumerResponse);
361             }
362
363             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
364                 final String urlPath = createUrlPath(
365                         MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
366                         timeoutMs, limit);
367
368                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
369                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
370                 if (o != null) {
371                     final JSONArray a = o.getJSONArray("result");
372
373                     if (a != null) {
374                         for (int i = 0; i < a.length(); i++) {
375                             if (a.get(i) instanceof String)
376                                 msgs.add(a.getString(i));
377                             else
378                                 msgs.add(a.getJSONObject(i).toString());
379                         }
380                     }
381                 }
382                 createMRConsumerResponse(response, mrConsumerResponse);
383             }
384
385             if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
386                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
387                         fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
388
389                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
390                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
391                 if (o != null) {
392                     final JSONArray a = o.getJSONArray("result");
393
394                     if (a != null) {
395                         for (int i = 0; i < a.length(); i++) {
396                             if (a.get(i) instanceof String)
397                                 msgs.add(a.getString(i));
398                             else
399                                 msgs.add(a.getJSONObject(i).toString());
400                         }
401                     }
402                 }
403                 createMRConsumerResponse(response, mrConsumerResponse);
404             }
405
406         } catch (JSONException e) {
407             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
408             mrConsumerResponse.setResponseMessage(e.getMessage());
409             log.error("json exception: ", e);
410         } catch (HttpException e) {
411             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
412             mrConsumerResponse.setResponseMessage(e.getMessage());
413             log.error("http exception: ", e);
414         } catch (DME2Exception e) {
415             mrConsumerResponse.setResponseCode(e.getErrorCode());
416             mrConsumerResponse.setResponseMessage(e.getErrorMessage());
417             log.error("DME2 exception: ", e);
418         } catch (Exception e) {
419             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
420             mrConsumerResponse.setResponseMessage(e.getMessage());
421             log.error("exception: ", e);
422         }
423         mrConsumerResponse.setActualMessages(msgs);
424         return mrConsumerResponse;
425     }
426
427     @Override
428     protected void reportProblemWithResponse() {
429         log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
430         super.reportProblemWithResponse();
431         fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
432     }
433
434     private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
435         if (reply.startsWith("{")) {
436             JSONObject jObject = new JSONObject(reply);
437             String message = jObject.getString("message");
438             int status = jObject.getInt("status");
439
440             mrConsumerResponse.setResponseCode(Integer.toString(status));
441
442             if (null != message) {
443                 mrConsumerResponse.setResponseMessage(message);
444             }
445         } else if (reply.startsWith("<")) {
446             mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
447             mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
448         } else {
449             mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
450             mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
451         }
452     }
453
454     private JSONObject getResponseDataInJson(String response) {
455         try {
456             JSONTokener jsonTokener = new JSONTokener(response);
457             JSONObject jsonObject = null;
458             final char firstChar = jsonTokener.next();
459             jsonTokener.back();
460             if ('[' == firstChar) {
461                 JSONArray jsonArray = new JSONArray(jsonTokener);
462                 jsonObject = new JSONObject();
463                 jsonObject.put("result", jsonArray);
464             } else {
465                 jsonObject = new JSONObject(jsonTokener);
466             }
467
468             return jsonObject;
469         } catch (JSONException excp) {
470             log.error("DMAAP - Error reading response data.", excp);
471             return null;
472         }
473     }
474
475     private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
476         JSONTokener jsonTokener = new JSONTokener(response);
477         JSONObject jsonObject = null;
478         final char firstChar = jsonTokener.next();
479         jsonTokener.back();
480         if (null != response && response.length() == 0) {
481             return null;
482         }
483
484         if ('[' == firstChar) {
485             JSONArray jsonArray = new JSONArray(jsonTokener);
486             jsonObject = new JSONObject();
487             jsonObject.put("result", jsonArray);
488         } else if ('{' == firstChar) {
489             return null;
490         } else if ('<' == firstChar) {
491             return null;
492         } else {
493             jsonObject = new JSONObject(jsonTokener);
494         }
495
496         return jsonObject;
497     }
498
499     private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
500         this.longPollingMs = timeoutMs;
501         String latitude = props.getProperty("Latitude");
502         String longitude = props.getProperty("Longitude");
503         String version = props.getProperty("Version");
504         String serviceName = props.getProperty("ServiceName");
505         String env = props.getProperty("Environment");
506         String partner = props.getProperty("Partner");
507         String routeOffer = props.getProperty("routeOffer");
508         String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
509         String protocol = props.getProperty("Protocol");
510         String methodType = props.getProperty("MethodType");
511         String dmeuser = props.getProperty("username");
512         String dmepassword = props.getProperty("password");
513         String contenttype = props.getProperty("contenttype");
514         String handlers = props.getProperty("sessionstickinessrequired");
515
516         /**
517          * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
518          * provided use the routeOffer value for auto failover within a cluster
519          */
520
521         String preferredRouteKey = readRoute("preferredRouteKey");
522
523         if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
524             url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
525                     + "&routeoffer=" + preferredRouteKey;
526         } else if (partner != null && !partner.isEmpty()) {
527             url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
528         } else if (routeOffer != null && !routeOffer.isEmpty()) {
529             url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
530                     + routeOffer;
531         }
532
533         if (timeoutMs != -1)
534             url = url + "&timeout=" + timeoutMs;
535         if (limit != -1)
536             url = url + "&limit=" + limit;
537
538         // Add filter to DME2 Url
539         if (fFilter != null && fFilter.length() > 0)
540             url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
541
542         DMETimeOuts = new HashMap<>();
543         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
544         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
545         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
546         DMETimeOuts.put("Content-Type", contenttype);
547         System.setProperty("AFT_LATITUDE", latitude);
548         System.setProperty("AFT_LONGITUDE", longitude);
549         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
550
551         // SSL changes
552         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
553         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
554         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
555         // SSL changes
556
557         long dme2PerEndPointTimeoutMs;
558         try {
559             dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
560             // backward compatibility
561             if (dme2PerEndPointTimeoutMs <= 0) {
562                 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
563             }
564         } catch (NumberFormatException nfe) {
565             // backward compatibility
566             dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
567             getLog().debug(
568                     "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
569         }
570
571         try {
572             dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
573         } catch (NumberFormatException nfe) {
574             try {
575                 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
576                 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
577                 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
578                 getLog().debug(
579                         "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
580                                 + dme2ReplyHandlerTimeoutMs);
581             } catch (NumberFormatException e) {
582                 // backward compatibility
583                 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
584                 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
585             }
586         }
587         // backward compatibility
588         if (dme2ReplyHandlerTimeoutMs <= 0) {
589             dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
590         }
591
592         sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
593         sender.setAllowAllHttpReturnCodes(true);
594         sender.setMethod(methodType);
595         sender.setSubContext(subContextPath);
596         if (dmeuser != null && dmepassword != null) {
597             sender.setCredentials(dmeuser, dmepassword);
598         }
599         sender.setHeaders(DMETimeOuts);
600         sender.setPayload("");
601         if (handlers != null && handlers.equalsIgnoreCase("yes")) {
602             sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
603                     props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
604             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
605             sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
606         } else {
607             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
608         }
609     }
610
611     protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
612         final StringBuilder contexturl = new StringBuilder(url);
613         final StringBuilder adds = new StringBuilder();
614
615         if (timeoutMs > -1) {
616             adds.append("timeout=").append(timeoutMs);
617         }
618
619         if (limit > -1) {
620             if (adds.length() > 0) {
621                 adds.append("&");
622             }
623             adds.append("limit=").append(limit);
624         }
625
626         if (fFilter != null && fFilter.length() > 0) {
627             try {
628                 if (adds.length() > 0) {
629                     adds.append("&");
630                 }
631                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
632             } catch (UnsupportedEncodingException e) {
633                 log.error("exception at createUrlPath ()  :  ", e);
634             }
635         }
636
637         if (adds.length() > 0) {
638             contexturl.append("?").append(adds.toString());
639         }
640
641         return contexturl.toString();
642     }
643
644     private String readRoute(String routeKey) {
645         try {
646             MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
647         } catch (Exception ex) {
648             log.error("Reply Router Error " + ex);
649         }
650         return MRClientFactory.prop.getProperty(routeKey);
651     }
652
653     public static List<String> stringToList(String str) {
654         final LinkedList<String> set = new LinkedList<>();
655         if (str != null) {
656             final String[] parts = str.trim().split(",");
657             for (String part : parts) {
658                 final String trimmed = part.trim();
659                 if (trimmed.length() > 0) {
660                     set.add(trimmed);
661                 }
662             }
663         }
664         return set;
665     }
666
667     public static String getRouterFilePath() {
668         return routerFilePath;
669     }
670
671     public static void setRouterFilePath(String routerFilePath) {
672         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
673     }
674
675     public String getConsumerFilePath() {
676         return consumerFilePath;
677     }
678
679     public void setConsumerFilePath(String consumerFilePath) {
680         this.consumerFilePath = consumerFilePath;
681     }
682
683     public String getProtocolFlag() {
684         return protocolFlag;
685     }
686
687     public void setProtocolFlag(String protocolFlag) {
688         this.protocolFlag = protocolFlag;
689     }
690
691     public Properties getProps() {
692         return props;
693     }
694
695     public void setProps(Properties props) {
696         this.props = props;
697         setClientConfig(DmaapClientUtil.getClientConfig(props));
698     }
699
700     public String getUsername() {
701         return username;
702     }
703
704     public void setUsername(String username) {
705         this.username = username;
706     }
707
708     public String getPassword() {
709         return password;
710     }
711
712     public void setPassword(String password) {
713         this.password = password;
714     }
715
716     public String getHost() {
717         return host;
718     }
719
720     public void setHost(String host) {
721         this.host = host;
722     }
723
724     public String getAuthKey() {
725         return authKey;
726     }
727
728     public void setAuthKey(String authKey) {
729         this.authKey = authKey;
730     }
731
732     public String getAuthDate() {
733         return authDate;
734     }
735
736     public void setAuthDate(String authDate) {
737         this.authDate = authDate;
738     }
739
740     public String getfFilter() {
741         return fFilter;
742     }
743
744     public void setfFilter(String fFilter) {
745         this.fFilter = fFilter;
746     }
747 }