First sonar issues review
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  * 
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import com.att.aft.dme2.api.DME2Client;
25 import com.att.aft.dme2.api.DME2Exception;
26 import org.onap.dmaap.mr.client.HostSelector;
27 import org.onap.dmaap.mr.client.MRClientFactory;
28 import org.onap.dmaap.mr.client.MRConsumer;
29 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
31
32 import java.io.*;
33 import java.net.MalformedURLException;
34 import java.net.URI;
35 import java.net.URISyntaxException;
36 import java.net.URLEncoder;
37 import java.util.*;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpStatus;
41 import org.json.JSONArray;
42 import org.json.JSONException;
43 import org.json.JSONObject;
44 import org.json.JSONTokener;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
49
50     private Logger log = LoggerFactory.getLogger(this.getClass().getName());
51
52     public static final String ROUTER_FILE_PATH = null;
53
54     public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
55     public String consumerFilePath;
56
57     private static final String JSON_RESULT = "result";
58     private static final String PROPS_PROTOCOL = "Protocol";
59
60     private static final String EXECPTION_MESSAGE = "exception: ";
61     private static final String SUCCESS_MESSAGE = "Success";
62     private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
63     private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
64
65     private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
66     private static final String URL_PARAM_PARTNER = "partner";
67     private static final String URL_PARAM_ENV_CONTEXT = "envContext";
68     private static final String URL_PARAM_VERSION = "version";
69     private static final String URL_PARAM_FILTER = "filter";
70     private static final String URL_PARAM_LIMIT = "limit";
71     private static final String URL_PARAM_TIMEOUT = "timeout";
72
73     private final String fTopic;
74     private final String fGroup;
75     private final String fId;
76     private final int fTimeoutMs;
77     private final int fLimit;
78     private String fFilter;
79     private String username;
80     private String password;
81     private String host;
82     private HostSelector fHostSelector = null;
83     private String url;
84     private DME2Client sender;
85     private String authKey;
86     private String authDate;
87     private Properties props;
88     private HashMap<String, String> DMETimeOuts;
89     private long dme2ReplyHandlerTimeoutMs;
90     private long longPollingMs;
91
92     public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
93         super(builder.hostPart,
94                 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
95
96         fTopic = builder.topic;
97         fGroup = builder.consumerGroup;
98         fId = builder.consumerId;
99         fTimeoutMs = builder.timeoutMs;
100         fLimit = builder.limit;
101         fFilter = builder.filter;
102
103         fHostSelector = new HostSelector(builder.hostPart);
104     }
105
106     public static class MRConsumerImplBuilder {
107         private Collection<String> hostPart;
108         private String topic;
109         private String consumerGroup;
110         private String consumerId;
111         private int timeoutMs;
112         private int limit;
113         private String filter;
114         private String apiKey_username;
115         private String apiSecret_password;
116         private String apiKey;
117         private String apiSecret;
118         private boolean allowSelfSignedCerts = false;
119
120         public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
121             this.hostPart = hostPart;
122             return this;
123         }
124
125         public MRConsumerImplBuilder setTopic(String topic) {
126             this.topic = topic;
127             return this;
128         }
129
130         public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
131             this.consumerGroup = consumerGroup;
132             return this;
133         }
134
135         public MRConsumerImplBuilder setConsumerId(String consumerId) {
136             this.consumerId = consumerId;
137             return this;
138         }
139
140         public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
141             this.timeoutMs = timeoutMs;
142             return this;
143         }
144
145         public MRConsumerImplBuilder setLimit(int limit) {
146             this.limit = limit;
147             return this;
148         }
149
150         public MRConsumerImplBuilder setFilter(String filter) {
151             this.filter = filter;
152             return this;
153         }
154
155         public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
156             this.apiKey_username = apiKey_username;
157             return this;
158         }
159
160         public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
161             this.apiSecret_password = apiSecret_password;
162             return this;
163         }
164
165         public MRConsumerImplBuilder setApiKey(String apiKey) {
166             this.apiKey = apiKey;
167             return this;
168         }
169
170         public MRConsumerImplBuilder setApiSecret(String apiSecret) {
171             this.apiSecret = apiSecret;
172             return this;
173         }
174
175         public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
176             this.allowSelfSignedCerts = allowSelfSignedCerts;
177             return this;
178         }
179
180         public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
181             return new MRConsumerImpl(this);
182         }
183     }
184
185     @Override
186     public Iterable<String> fetch() throws IOException, Exception {
187         // fetch with the timeout and limit set in constructor
188         return fetch(fTimeoutMs, fLimit);
189     }
190
191     @Override
192     public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
193         final LinkedList<String> msgs = new LinkedList<>();
194
195         ProtocolTypeConstants protocolFlagEnum = null;
196         for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) {
197             if (type.getValue().equalsIgnoreCase(protocolFlag)) {
198                 protocolFlagEnum = type;
199             }
200         }
201         if (protocolFlagEnum == null) {
202             return msgs;
203         }
204
205         try {
206             switch (protocolFlagEnum) {
207                 case DME2:
208                     dmeConfigure(timeoutMs, limit);
209                     String reply = sender.sendAndWait(timeoutMs + 10000L);
210                     readJsonData(msgs, getResponseDataInJson(reply));
211                     break;
212                 case AAF_AUTH:
213                     String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
214                             fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
215                     final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
216                     readJsonData(msgs, o);
217                     break;
218                 case AUTH_KEY:
219                     final String urlKeyPath = createUrlPath(
220                             MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
221                             timeoutMs, limit);
222                     final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
223                     readJsonData(msgs, authObject);
224                     break;
225                 case HTTPNOAUTH:
226                     final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
227                             fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
228                     readJsonData(msgs, getNoAuth(urlNoAuthPath));
229                     break;
230             }
231         } catch (JSONException e) {
232             // unexpected response
233             reportProblemWithResponse();
234             log.error(EXECPTION_MESSAGE, e);
235         } catch (HttpException e) {
236             throw new IOException(e);
237         }
238
239         return msgs;
240     }
241
242     private void readJsonData(LinkedList<String> msgs, JSONObject o) {
243         if (o != null) {
244             final JSONArray a = o.getJSONArray(JSON_RESULT);
245             if (a != null) {
246                 for (int i = 0; i < a.length(); i++) {
247                     if (a.get(i) instanceof String)
248                         msgs.add(a.getString(i));
249                     else
250                         msgs.add(a.getJSONObject(i).toString());
251                 }
252             }
253         }
254     }
255
256     @Override
257     public MRConsumerResponse fetchWithReturnConsumerResponse() {
258         // fetch with the timeout and limit set in constructor
259         return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
260     }
261
262     @Override
263     public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
264         final LinkedList<String> msgs = new LinkedList<>();
265         MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
266         try {
267             if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
268                 dmeConfigure(timeoutMs, limit);
269
270                 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
271                         : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
272                 String reply = sender.sendAndWait(timeout);
273
274                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
275
276                 readJsonData(msgs, o);
277                 createMRConsumerResponse(reply, mrConsumerResponse);
278             }
279
280             if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
281                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
282                         fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
283
284                 String response = getResponse(urlPath, username, password, protocolFlag);
285                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
286                 readJsonData(msgs, o);
287                 createMRConsumerResponse(response, mrConsumerResponse);
288             }
289
290             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
291                 final String urlPath = createUrlPath(
292                         MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
293                         timeoutMs, limit);
294
295                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
296                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
297                 readJsonData(msgs, o);
298                 createMRConsumerResponse(response, mrConsumerResponse);
299             }
300
301             if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
302                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
303                         fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
304
305                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
306                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
307                 readJsonData(msgs, o);
308                 createMRConsumerResponse(response, mrConsumerResponse);
309             }
310
311         } catch (JSONException e) {
312             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
313             mrConsumerResponse.setResponseMessage(e.getMessage());
314             log.error("json exception: ", e);
315         } catch (HttpException e) {
316             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
317             mrConsumerResponse.setResponseMessage(e.getMessage());
318             log.error("http exception: ", e);
319         } catch (DME2Exception e) {
320             mrConsumerResponse.setResponseCode(e.getErrorCode());
321             mrConsumerResponse.setResponseMessage(e.getErrorMessage());
322             log.error("DME2 exception: ", e);
323         } catch (Exception e) {
324             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
325             mrConsumerResponse.setResponseMessage(e.getMessage());
326             log.error(EXECPTION_MESSAGE, e);
327         }
328         mrConsumerResponse.setActualMessages(msgs);
329         return mrConsumerResponse;
330     }
331
332     @Override
333     protected void reportProblemWithResponse() {
334         log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
335         super.reportProblemWithResponse();
336         fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
337     }
338
339     private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
340         if (reply.startsWith("{")) {
341             JSONObject jObject = new JSONObject(reply);
342             String message = jObject.getString("message");
343             int status = jObject.getInt("status");
344
345             mrConsumerResponse.setResponseCode(Integer.toString(status));
346
347             if (null != message) {
348                 mrConsumerResponse.setResponseMessage(message);
349             }
350         } else if (reply.startsWith("<")) {
351             mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
352             mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
353         } else {
354             mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
355             mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
356         }
357     }
358
359     private JSONObject getResponseDataInJson(String response) {
360         try {
361             JSONTokener jsonTokener = new JSONTokener(response);
362             JSONObject jsonObject = null;
363             final char firstChar = jsonTokener.next();
364             jsonTokener.back();
365             if ('[' == firstChar) {
366                 JSONArray jsonArray = new JSONArray(jsonTokener);
367                 jsonObject = new JSONObject();
368                 jsonObject.put(JSON_RESULT, jsonArray);
369             } else {
370                 jsonObject = new JSONObject(jsonTokener);
371             }
372
373             return jsonObject;
374         } catch (JSONException excp) {
375             log.error("DMAAP - Error reading response data.", excp);
376             return null;
377         }
378     }
379
380     private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
381         JSONTokener jsonTokener = new JSONTokener(response);
382         JSONObject jsonObject = null;
383         final char firstChar = jsonTokener.next();
384         jsonTokener.back();
385         if (null != response && response.length() == 0) {
386             return null;
387         }
388
389         if ('[' == firstChar) {
390             JSONArray jsonArray = new JSONArray(jsonTokener);
391             jsonObject = new JSONObject();
392             jsonObject.put(JSON_RESULT, jsonArray);
393         } else if ('{' == firstChar) {
394             return null;
395         } else if ('<' == firstChar) {
396             return null;
397         } else {
398             jsonObject = new JSONObject(jsonTokener);
399         }
400
401         return jsonObject;
402     }
403
404     private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
405         this.longPollingMs = timeoutMs;
406         String latitude = props.getProperty("Latitude");
407         String longitude = props.getProperty("Longitude");
408         String version = props.getProperty("Version");
409         String serviceName = props.getProperty("ServiceName");
410         String env = props.getProperty("Environment");
411         String partner = props.getProperty("Partner");
412         String routeOffer = props.getProperty("routeOffer");
413         String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
414         String protocol = props.getProperty(PROPS_PROTOCOL);
415         String methodType = props.getProperty("MethodType");
416         String dmeuser = props.getProperty("username");
417         String dmepassword = props.getProperty("password");
418         String contenttype = props.getProperty("contenttype");
419         String handlers = props.getProperty("sessionstickinessrequired");
420
421         /**
422          * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
423          * provided use the routeOffer value for auto failover within a cluster
424          */
425
426         String preferredRouteKey = readRoute("preferredRouteKey");
427         StringBuilder contextUrl = new StringBuilder();
428         if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
429             contextUrl.append(protocol).append("://").append(serviceName).append("?")
430                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
431                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
432                     .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
433                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
434         } else if (partner != null && !partner.isEmpty()) {
435             contextUrl.append(protocol).append("://").append(serviceName).append("?")
436                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
437                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
438                     .append(URL_PARAM_PARTNER).append("=").append(partner);
439         } else if (routeOffer != null && !routeOffer.isEmpty()) {
440             contextUrl.append(protocol).append("://").append(serviceName).append("?")
441                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
442                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
443                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
444         }
445
446         if (timeoutMs != -1) {
447             contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
448         }
449         if (limit != -1) {
450             contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
451         }
452
453         // Add filter to DME2 Url
454         if (fFilter != null && fFilter.length() > 0) {
455             contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
456         }
457
458         url = contextUrl.toString();
459
460         DMETimeOuts = new HashMap<>();
461         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
462         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
463         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
464         DMETimeOuts.put("Content-Type", contenttype);
465         System.setProperty("AFT_LATITUDE", latitude);
466         System.setProperty("AFT_LONGITUDE", longitude);
467         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
468
469         // SSL changes
470         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
471         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
472         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
473         // SSL changes
474
475         long dme2PerEndPointTimeoutMs;
476         try {
477             dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
478             // backward compatibility
479             if (dme2PerEndPointTimeoutMs <= 0) {
480                 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
481             }
482         } catch (NumberFormatException nfe) {
483             // backward compatibility
484             dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
485             getLog().debug(
486                     "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
487         }
488
489         try {
490             dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
491         } catch (NumberFormatException nfe) {
492             try {
493                 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
494                 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
495                 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
496                 getLog().debug(
497                         "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
498                                 + dme2ReplyHandlerTimeoutMs);
499             } catch (NumberFormatException e) {
500                 // backward compatibility
501                 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
502                 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
503             }
504         }
505         // backward compatibility
506         if (dme2ReplyHandlerTimeoutMs <= 0) {
507             dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
508         }
509
510         sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
511         sender.setAllowAllHttpReturnCodes(true);
512         sender.setMethod(methodType);
513         sender.setSubContext(subContextPath);
514         if (dmeuser != null && dmepassword != null) {
515             sender.setCredentials(dmeuser, dmepassword);
516         }
517         sender.setHeaders(DMETimeOuts);
518         sender.setPayload("");
519         if (handlers != null && handlers.equalsIgnoreCase("yes")) {
520             sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
521                     props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
522             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
523             sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
524         } else {
525             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
526         }
527     }
528
529     protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
530         final StringBuilder contexturl = new StringBuilder(url);
531         final StringBuilder adds = new StringBuilder();
532
533         if (timeoutMs > -1) {
534             adds.append("timeout=").append(timeoutMs);
535         }
536
537         if (limit > -1) {
538             if (adds.length() > 0) {
539                 adds.append("&");
540             }
541             adds.append("limit=").append(limit);
542         }
543
544         if (fFilter != null && fFilter.length() > 0) {
545             try {
546                 if (adds.length() > 0) {
547                     adds.append("&");
548                 }
549                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
550             } catch (UnsupportedEncodingException e) {
551                 log.error("exception at createUrlPath ()  :  ", e);
552             }
553         }
554
555         if (adds.length() > 0) {
556             contexturl.append("?").append(adds.toString());
557         }
558
559         return contexturl.toString();
560     }
561
562     private String readRoute(String routeKey) {
563         try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
564             MRClientFactory.prop.load(input);
565         } catch (Exception ex) {
566             log.error("Reply Router Error " + ex);
567         }
568         return MRClientFactory.prop.getProperty(routeKey);
569     }
570
571     public static List<String> stringToList(String str) {
572         final LinkedList<String> set = new LinkedList<>();
573         if (str != null) {
574             final String[] parts = str.trim().split(",");
575             for (String part : parts) {
576                 final String trimmed = part.trim();
577                 if (trimmed.length() > 0) {
578                     set.add(trimmed);
579                 }
580             }
581         }
582         return set;
583     }
584
585     public static String getRouterFilePath() {
586         return ROUTER_FILE_PATH;
587     }
588
589     public static void setRouterFilePath(String routerFilePath) {
590         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
591     }
592
593     public String getConsumerFilePath() {
594         return consumerFilePath;
595     }
596
597     public void setConsumerFilePath(String consumerFilePath) {
598         this.consumerFilePath = consumerFilePath;
599     }
600
601     public String getProtocolFlag() {
602         return protocolFlag;
603     }
604
605     public void setProtocolFlag(String protocolFlag) {
606         this.protocolFlag = protocolFlag;
607     }
608
609     public Properties getProps() {
610         return props;
611     }
612
613     public void setProps(Properties props) {
614         this.props = props;
615         setClientConfig(DmaapClientUtil.getClientConfig(props));
616     }
617
618     public String getUsername() {
619         return username;
620     }
621
622     public void setUsername(String username) {
623         this.username = username;
624     }
625
626     public String getPassword() {
627         return password;
628     }
629
630     public void setPassword(String password) {
631         this.password = password;
632     }
633
634     public String getHost() {
635         return host;
636     }
637
638     public void setHost(String host) {
639         this.host = host;
640     }
641
642     public String getAuthKey() {
643         return authKey;
644     }
645
646     public void setAuthKey(String authKey) {
647         this.authKey = authKey;
648     }
649
650     public String getAuthDate() {
651         return authDate;
652     }
653
654     public void setAuthDate(String authDate) {
655         this.authDate = authDate;
656     }
657
658     public String getfFilter() {
659         return fFilter;
660     }
661
662     public void setfFilter(String fFilter) {
663         this.fFilter = fFilter;
664     }
665 }