Sonar issues review, fix properties constant name.
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.client.impl;
26
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.FileInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.UnsupportedEncodingException;
33 import java.net.MalformedURLException;
34 import java.net.URI;
35 import java.net.URISyntaxException;
36 import java.net.URLEncoder;
37 import java.nio.charset.StandardCharsets;
38 import java.util.Collection;
39 import java.util.HashMap;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Properties;
43 import java.util.concurrent.TimeUnit;
44 import org.apache.http.HttpException;
45 import org.apache.http.HttpStatus;
46 import org.json.JSONArray;
47 import org.json.JSONException;
48 import org.json.JSONObject;
49 import org.json.JSONTokener;
50 import org.onap.dmaap.mr.client.DmaapClientConst;
51 import org.onap.dmaap.mr.client.HostSelector;
52 import org.onap.dmaap.mr.client.MRClientFactory;
53 import org.onap.dmaap.mr.client.MRConsumer;
54 import org.onap.dmaap.mr.client.ProtocolType;
55 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
60
61     private static final Logger logger = LoggerFactory.getLogger(MRConsumerImpl.class);
62
63     public static final String ROUTER_FILE_PATH = null;
64
65     public String protocolFlag = ProtocolType.DME2.getValue();
66     public String consumerFilePath;
67
68     private static final String JSON_RESULT = "result";
69
70     private static final String EXECPTION_MESSAGE = "exception: ";
71     private static final String SUCCESS_MESSAGE = "Success";
72     private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
73     private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
74
75     private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
76     private static final String URL_PARAM_PARTNER = "partner";
77     private static final String URL_PARAM_ENV_CONTEXT = "envContext";
78     private static final String URL_PARAM_VERSION = "version";
79     private static final String URL_PARAM_FILTER = "filter";
80     private static final String URL_PARAM_LIMIT = "limit";
81     private static final String URL_PARAM_TIMEOUT = "timeout";
82     
83     private static final String HEADER_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
84     private static final String HEADER_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
85     private static final String HEADER_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
86
87     private final String fTopic;
88     private final String fGroup;
89     private final String fId;
90     private final int fTimeoutMs;
91     private final int fLimit;
92     private String fFilter;
93     private String username;
94     private String password;
95     private String host;
96     private HostSelector fHostSelector = null;
97     private String url;
98     private DME2Client sender;
99     private String authKey;
100     private String authDate;
101     private Properties props;
102     private HashMap<String, String> DMETimeOuts;
103     private long dme2ReplyHandlerTimeoutMs;
104     private long longPollingMs;
105
106     public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
107         super(builder.hostPart,
108                 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
109
110         fTopic = builder.topic;
111         fGroup = builder.consumerGroup;
112         fId = builder.consumerId;
113         fTimeoutMs = builder.timeoutMs;
114         fLimit = builder.limit;
115         fFilter = builder.filter;
116
117         fHostSelector = new HostSelector(builder.hostPart);
118     }
119
120     public static class MRConsumerImplBuilder {
121         private Collection<String> hostPart;
122         private String topic;
123         private String consumerGroup;
124         private String consumerId;
125         private int timeoutMs;
126         private int limit;
127         private String filter;
128         private String apiKey_username;
129         private String apiSecret_password;
130         private String apiKey;
131         private String apiSecret;
132         private boolean allowSelfSignedCerts = false;
133
134         public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
135             this.hostPart = hostPart;
136             return this;
137         }
138
139         public MRConsumerImplBuilder setTopic(String topic) {
140             this.topic = topic;
141             return this;
142         }
143
144         public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
145             this.consumerGroup = consumerGroup;
146             return this;
147         }
148
149         public MRConsumerImplBuilder setConsumerId(String consumerId) {
150             this.consumerId = consumerId;
151             return this;
152         }
153
154         public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
155             this.timeoutMs = timeoutMs;
156             return this;
157         }
158
159         public MRConsumerImplBuilder setLimit(int limit) {
160             this.limit = limit;
161             return this;
162         }
163
164         public MRConsumerImplBuilder setFilter(String filter) {
165             this.filter = filter;
166             return this;
167         }
168
169         public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
170             this.apiKey_username = apiKey_username;
171             return this;
172         }
173
174         public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
175             this.apiSecret_password = apiSecret_password;
176             return this;
177         }
178
179         public MRConsumerImplBuilder setApiKey(String apiKey) {
180             this.apiKey = apiKey;
181             return this;
182         }
183
184         public MRConsumerImplBuilder setApiSecret(String apiSecret) {
185             this.apiSecret = apiSecret;
186             return this;
187         }
188
189         public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
190             this.allowSelfSignedCerts = allowSelfSignedCerts;
191             return this;
192         }
193
194         public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
195             return new MRConsumerImpl(this);
196         }
197     }
198
199     @Override
200     public Iterable<String> fetch() throws IOException, Exception {
201         // fetch with the timeout and limit set in constructor
202         return fetch(fTimeoutMs, fLimit);
203     }
204
205     @Override
206     public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
207         final LinkedList<String> msgs = new LinkedList<>();
208
209         ProtocolType protocolFlagEnum = null;
210         for (ProtocolType type : ProtocolType.values()) {
211             if (type.getValue().equalsIgnoreCase(protocolFlag)) {
212                 protocolFlagEnum = type;
213             }
214         }
215         if (protocolFlagEnum == null) {
216             return msgs;
217         }
218
219         try {
220             switch (protocolFlagEnum) {
221                 case DME2:
222                     dmeConfigure(timeoutMs, limit);
223                     String reply = sender.sendAndWait(timeoutMs + 10000L);
224                     readJsonData(msgs, getResponseDataInJson(reply));
225                     break;
226                 case AAF_AUTH:
227                     String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
228                             fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)), timeoutMs, limit);
229                     final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
230                     readJsonData(msgs, o);
231                     break;
232                 case AUTH_KEY:
233                     final String urlKeyPath = createUrlPath(
234                             MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)),
235                             timeoutMs, limit);
236                     final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
237                     readJsonData(msgs, authObject);
238                     break;
239                 case HTTPNOAUTH:
240                     final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
241                             fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)), timeoutMs, limit);
242                     readJsonData(msgs, getNoAuth(urlNoAuthPath));
243                     break;
244             }
245         } catch (JSONException e) {
246             // unexpected response
247             reportProblemWithResponse();
248             logger.error(EXECPTION_MESSAGE, e);
249         } catch (HttpException e) {
250             throw new IOException(e);
251         }
252
253         return msgs;
254     }
255
256     private void readJsonData(LinkedList<String> msgs, JSONObject o) {
257         if (o != null) {
258             final JSONArray a = o.getJSONArray(JSON_RESULT);
259             if (a != null) {
260                 for (int i = 0; i < a.length(); i++) {
261                     if (a.get(i) instanceof String) {
262                         msgs.add(a.getString(i));
263                     } else {
264                         msgs.add(a.getJSONObject(i).toString());
265                     }
266                 }
267             }
268         }
269     }
270
271     @Override
272     public MRConsumerResponse fetchWithReturnConsumerResponse() {
273         // fetch with the timeout and limit set in constructor
274         return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
275     }
276
277     @Override
278     public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
279         final LinkedList<String> msgs = new LinkedList<>();
280         MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
281         try {
282             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
283                 dmeConfigure(timeoutMs, limit);
284
285                 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
286                         : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
287                 String reply = sender.sendAndWait(timeout);
288
289                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
290
291                 readJsonData(msgs, o);
292                 createMRConsumerResponse(reply, mrConsumerResponse);
293             }
294
295             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
296                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
297                         fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)), timeoutMs, limit);
298
299                 String response = getResponse(urlPath, username, password, protocolFlag);
300                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
301                 readJsonData(msgs, o);
302                 createMRConsumerResponse(response, mrConsumerResponse);
303             }
304
305             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
306                 final String urlPath = createUrlPath(
307                         MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)),
308                         timeoutMs, limit);
309
310                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
311                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
312                 readJsonData(msgs, o);
313                 createMRConsumerResponse(response, mrConsumerResponse);
314             }
315
316             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
317                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
318                         fGroup, fId, props.getProperty(DmaapClientConst.PROTOCOL)), timeoutMs, limit);
319
320                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
321                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
322                 readJsonData(msgs, o);
323                 createMRConsumerResponse(response, mrConsumerResponse);
324             }
325
326         } catch (JSONException e) {
327             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
328             mrConsumerResponse.setResponseMessage(e.getMessage());
329             logger.error("json exception: ", e);
330         } catch (HttpException e) {
331             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
332             mrConsumerResponse.setResponseMessage(e.getMessage());
333             logger.error("http exception: ", e);
334         } catch (DME2Exception e) {
335             mrConsumerResponse.setResponseCode(e.getErrorCode());
336             mrConsumerResponse.setResponseMessage(e.getErrorMessage());
337             logger.error("DME2 exception: ", e);
338         } catch (Exception e) {
339             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
340             mrConsumerResponse.setResponseMessage(e.getMessage());
341             logger.error(EXECPTION_MESSAGE, e);
342         }
343         mrConsumerResponse.setActualMessages(msgs);
344         return mrConsumerResponse;
345     }
346
347     @Override
348     protected void reportProblemWithResponse() {
349         logger.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
350         super.reportProblemWithResponse();
351         fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
352     }
353
354     private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
355         if (reply.startsWith("{")) {
356             JSONObject jsonObject = new JSONObject(reply);
357             String message = jsonObject.getString("message");
358             int status = jsonObject.getInt("status");
359
360             mrConsumerResponse.setResponseCode(Integer.toString(status));
361
362             if (null != message) {
363                 mrConsumerResponse.setResponseMessage(message);
364             }
365         } else if (reply.startsWith("<")) {
366             mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
367             mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
368         } else {
369             mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
370             mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
371         }
372     }
373
374     private JSONObject getResponseDataInJson(String response) {
375         try {
376             JSONTokener jsonTokener = new JSONTokener(response);
377             JSONObject jsonObject = null;
378             final char firstChar = jsonTokener.next();
379             jsonTokener.back();
380             if ('[' == firstChar) {
381                 JSONArray jsonArray = new JSONArray(jsonTokener);
382                 jsonObject = new JSONObject();
383                 jsonObject.put(JSON_RESULT, jsonArray);
384             } else {
385                 jsonObject = new JSONObject(jsonTokener);
386             }
387
388             return jsonObject;
389         } catch (JSONException excp) {
390             logger.error("DMAAP - Error reading response data.", excp);
391             return null;
392         }
393     }
394
395     private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
396         JSONTokener jsonTokener = new JSONTokener(response);
397         JSONObject jsonObject = null;
398         final char firstChar = jsonTokener.next();
399         jsonTokener.back();
400         if (null != response && response.length() == 0) {
401             return null;
402         }
403
404         if ('[' == firstChar) {
405             JSONArray jsonArray = new JSONArray(jsonTokener);
406             jsonObject = new JSONObject();
407             jsonObject.put(JSON_RESULT, jsonArray);
408         } else if ('{' == firstChar) {
409             return null;
410         } else if ('<' == firstChar) {
411             return null;
412         } else {
413             jsonObject = new JSONObject(jsonTokener);
414         }
415
416         return jsonObject;
417     }
418
419     private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
420         this.longPollingMs = timeoutMs;
421         String latitude = props.getProperty(DmaapClientConst.LATITUDE);
422         String longitude = props.getProperty(DmaapClientConst.LONGITUDE);
423         String version = props.getProperty(DmaapClientConst.VERSION);
424         String serviceName = props.getProperty(DmaapClientConst.SERVICE_NAME);
425         String env = props.getProperty(DmaapClientConst.ENVIRONMENT);
426         String partner = props.getProperty(DmaapClientConst.PARTNER, "");
427         String routeOffer = props.getProperty(DmaapClientConst.ROUTE_OFFER, "");
428         String subContextPath = props.getProperty(DmaapClientConst.SUB_CONTEXT_PATH) + fTopic + "/" + fGroup + "/" + fId;
429         String protocol = props.getProperty(DmaapClientConst.PROTOCOL);
430         String methodType = props.getProperty(DmaapClientConst.METHOD_TYPE);
431         String dmeuser = props.getProperty(DmaapClientConst.USERNAME);
432         String dmepassword = props.getProperty(DmaapClientConst.USERNAME);
433         String contenttype = props.getProperty(DmaapClientConst.CONTENT_TYPE);
434         String handlers = props.getProperty(DmaapClientConst.SESSION_STICKINESS_REQUIRED, "");
435
436         /*
437          * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
438          * provided use the routeOffer value for auto failover within a cluster
439          */
440
441         String preferredRouteKey = readRoute("preferredRouteKey");
442         StringBuilder contextUrl = new StringBuilder();
443         if (!partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
444             contextUrl.append(protocol).append("://").append(serviceName).append("?")
445                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
446                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
447                     .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
448                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
449         } else if (!partner.isEmpty()) {
450             contextUrl.append(protocol).append("://").append(serviceName).append("?")
451                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
452                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
453                     .append(URL_PARAM_PARTNER).append("=").append(partner);
454         } else if (!routeOffer.isEmpty()) {
455             contextUrl.append(protocol).append("://").append(serviceName).append("?")
456                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
457                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
458                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
459         }
460
461         if (timeoutMs != -1) {
462             contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
463         }
464         if (limit != -1) {
465             contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
466         }
467
468         // Add filter to DME2 Url
469         if (fFilter != null && fFilter.length() > 0) {
470             contextUrl.append("&").append(URL_PARAM_FILTER).append("=")
471                     .append(URLEncoder.encode(fFilter, StandardCharsets.UTF_8));
472         }
473
474         url = contextUrl.toString();
475
476         DMETimeOuts = new HashMap<>();
477         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_EP_READ_TIMEOUT_MS));
478         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
479         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(DmaapClientConst.AFT_DME2_EP_CONN_TIMEOUT));
480         DMETimeOuts.put("Content-Type", contenttype);
481         System.setProperty("AFT_LATITUDE", latitude);
482         System.setProperty("AFT_LONGITUDE", longitude);
483         System.setProperty("AFT_ENVIRONMENT", props.getProperty(DmaapClientConst.AFT_ENVIRONMENT));
484
485         // SSL changes
486         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
487         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
488         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
489         // SSL changes
490
491         long dme2PerEndPointTimeoutMs;
492         try {
493             dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty(DmaapClientConst.DME2_PER_HANDLER_TIMEOUT_MS));
494             // backward compatibility
495             if (dme2PerEndPointTimeoutMs <= 0) {
496                 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
497             }
498         } catch (NumberFormatException nfe) {
499             // backward compatibility
500             dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
501             getLog().debug(
502                     DmaapClientConst.DME2_PER_HANDLER_TIMEOUT_MS + " not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
503         }
504
505         try {
506             dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty(DmaapClientConst.DME2_REPLY_HANDLER_TIMEOUT_MS));
507         } catch (NumberFormatException nfe) {
508             try {
509                 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty(DmaapClientConst.AFT_DME2_EP_READ_TIMEOUT_MS));
510                 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty(DmaapClientConst.AFT_DME2_EP_CONN_TIMEOUT));
511                 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
512                 getLog().debug(
513                         "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
514                                 + dme2ReplyHandlerTimeoutMs);
515             } catch (NumberFormatException e) {
516                 // backward compatibility
517                 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
518                 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
519             }
520         }
521         // backward compatibility
522         if (dme2ReplyHandlerTimeoutMs <= 0) {
523             dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
524         }
525
526         sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
527         sender.setAllowAllHttpReturnCodes(true);
528         sender.setMethod(methodType);
529         sender.setSubContext(subContextPath);
530         if (dmeuser != null && dmepassword != null) {
531             sender.setCredentials(dmeuser, dmepassword);
532         }
533         sender.setHeaders(DMETimeOuts);
534         sender.setPayload("");
535         if (handlers.equalsIgnoreCase("yes")) {
536             sender.addHeader(HEADER_DME2_EXCHANGE_REQUEST_HANDLERS,
537                     props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
538             sender.addHeader(HEADER_DME2_EXCHANGE_REPLY_HANDLERS, props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REPLY_HANDLERS));
539             sender.addHeader(HEADER_DME2_REQ_TRACE_ON, props.getProperty(DmaapClientConst.AFT_DME2_REQ_TRACE_ON));
540         } else {
541             sender.addHeader(HEADER_DME2_EXCHANGE_REPLY_HANDLERS, "com.att.nsa.mr.dme.client.HeaderReplyHandler");
542         }
543     }
544
545     protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
546         final StringBuilder contexturl = new StringBuilder(url);
547         final StringBuilder adds = new StringBuilder();
548
549         if (timeoutMs > -1) {
550             adds.append("timeout=").append(timeoutMs);
551         }
552
553         if (limit > -1) {
554             if (adds.length() > 0) {
555                 adds.append("&");
556             }
557             adds.append("limit=").append(limit);
558         }
559
560         if (fFilter != null && fFilter.length() > 0) {
561             try {
562                 if (adds.length() > 0) {
563                     adds.append("&");
564                 }
565                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
566             } catch (UnsupportedEncodingException e) {
567                 logger.error("exception at createUrlPath ()  :  ", e);
568             }
569         }
570
571         if (adds.length() > 0) {
572             contexturl.append("?").append(adds.toString());
573         }
574
575         return contexturl.toString();
576     }
577
578     private String readRoute(String routeKey) {
579         try (InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
580             MRClientFactory.prop.load(input);
581         } catch (Exception ex) {
582             logger.error("Reply Router Error " + ex);
583         }
584         return MRClientFactory.prop.getProperty(routeKey);
585     }
586
587     public static List<String> stringToList(String str) {
588         final LinkedList<String> set = new LinkedList<>();
589         if (str != null) {
590             final String[] parts = str.trim().split(",");
591             for (String part : parts) {
592                 final String trimmed = part.trim();
593                 if (trimmed.length() > 0) {
594                     set.add(trimmed);
595                 }
596             }
597         }
598         return set;
599     }
600
601     public static String getRouterFilePath() {
602         return ROUTER_FILE_PATH;
603     }
604
605     public static void setRouterFilePath(String routerFilePath) {
606         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
607     }
608
609     public String getConsumerFilePath() {
610         return consumerFilePath;
611     }
612
613     public void setConsumerFilePath(String consumerFilePath) {
614         this.consumerFilePath = consumerFilePath;
615     }
616
617     public String getProtocolFlag() {
618         return protocolFlag;
619     }
620
621     public void setProtocolFlag(String protocolFlag) {
622         this.protocolFlag = protocolFlag;
623     }
624
625     public Properties getProps() {
626         return props;
627     }
628
629     public void setProps(Properties props) {
630         this.props = props;
631         setClientConfig(DmaapClientUtil.getClientConfig(props));
632     }
633
634     public String getUsername() {
635         return username;
636     }
637
638     public void setUsername(String username) {
639         this.username = username;
640     }
641
642     public String getPassword() {
643         return password;
644     }
645
646     public void setPassword(String password) {
647         this.password = password;
648     }
649
650     public String getHost() {
651         return host;
652     }
653
654     public void setHost(String host) {
655         this.host = host;
656     }
657
658     public String getAuthKey() {
659         return authKey;
660     }
661
662     public void setAuthKey(String authKey) {
663         this.authKey = authKey;
664     }
665
666     public String getAuthDate() {
667         return authDate;
668     }
669
670     public void setAuthDate(String authDate) {
671         this.authDate = authDate;
672     }
673
674     public String getfFilter() {
675         return fFilter;
676     }
677
678     public void setfFilter(String fFilter) {
679         this.fFilter = fFilter;
680     }
681 }