[DMAAP-CLIENT] First sonar issues review part2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.client.impl;
26
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.FileInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.UnsupportedEncodingException;
33 import java.net.MalformedURLException;
34 import java.net.URI;
35 import java.net.URISyntaxException;
36 import java.net.URLEncoder;
37 import java.util.Collection;
38 import java.util.HashMap;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.Properties;
42 import java.util.concurrent.TimeUnit;
43 import org.apache.http.HttpException;
44 import org.apache.http.HttpStatus;
45 import org.json.JSONArray;
46 import org.json.JSONException;
47 import org.json.JSONObject;
48 import org.json.JSONTokener;
49 import org.onap.dmaap.mr.client.HostSelector;
50 import org.onap.dmaap.mr.client.MRClientFactory;
51 import org.onap.dmaap.mr.client.MRConsumer;
52 import org.onap.dmaap.mr.client.ProtocolType;
53 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
58
59     private static final Logger logger = LoggerFactory.getLogger(MRConsumerImpl.class);
60
61     public static final String ROUTER_FILE_PATH = null;
62
63     public String protocolFlag = ProtocolType.DME2.getValue();
64     public String consumerFilePath;
65
66     private static final String JSON_RESULT = "result";
67
68     private static final String EXECPTION_MESSAGE = "exception: ";
69     private static final String SUCCESS_MESSAGE = "Success";
70     private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
71     private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
72
73     private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
74     private static final String URL_PARAM_PARTNER = "partner";
75     private static final String URL_PARAM_ENV_CONTEXT = "envContext";
76     private static final String URL_PARAM_VERSION = "version";
77     private static final String URL_PARAM_FILTER = "filter";
78     private static final String URL_PARAM_LIMIT = "limit";
79     private static final String URL_PARAM_TIMEOUT = "timeout";
80
81     private static final String USERNAME = "username";
82     private static final String SERVICE_NAME = "ServiceName";
83     private static final String PARTNER = "Partner";
84     private static final String ROUTE_OFFER = "routeOffer";
85     private static final String PROTOCOL = "Protocol";
86     private static final String METHOD_TYPE = "MethodType";
87     private static final String CONTENT_TYPE = "contenttype";
88     private static final String LATITUDE = "Latitude";
89     private static final String LONGITUDE = "Longitude";
90     private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
91     private static final String VERSION = "Version";
92     private static final String ENVIRONMENT = "Environment";
93     private static final String SUB_CONTEXT_PATH = "SubContextPath";
94     private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
95     private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
96     private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
97     private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
98     private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
99     private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
100     private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
101     private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
102     private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
103
104     private final String fTopic;
105     private final String fGroup;
106     private final String fId;
107     private final int fTimeoutMs;
108     private final int fLimit;
109     private String fFilter;
110     private String username;
111     private String password;
112     private String host;
113     private HostSelector fHostSelector = null;
114     private String url;
115     private DME2Client sender;
116     private String authKey;
117     private String authDate;
118     private Properties props;
119     private HashMap<String, String> DMETimeOuts;
120     private long dme2ReplyHandlerTimeoutMs;
121     private long longPollingMs;
122
123     public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
124         super(builder.hostPart,
125                 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
126
127         fTopic = builder.topic;
128         fGroup = builder.consumerGroup;
129         fId = builder.consumerId;
130         fTimeoutMs = builder.timeoutMs;
131         fLimit = builder.limit;
132         fFilter = builder.filter;
133
134         fHostSelector = new HostSelector(builder.hostPart);
135     }
136
137     public static class MRConsumerImplBuilder {
138         private Collection<String> hostPart;
139         private String topic;
140         private String consumerGroup;
141         private String consumerId;
142         private int timeoutMs;
143         private int limit;
144         private String filter;
145         private String apiKey_username;
146         private String apiSecret_password;
147         private String apiKey;
148         private String apiSecret;
149         private boolean allowSelfSignedCerts = false;
150
151         public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
152             this.hostPart = hostPart;
153             return this;
154         }
155
156         public MRConsumerImplBuilder setTopic(String topic) {
157             this.topic = topic;
158             return this;
159         }
160
161         public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
162             this.consumerGroup = consumerGroup;
163             return this;
164         }
165
166         public MRConsumerImplBuilder setConsumerId(String consumerId) {
167             this.consumerId = consumerId;
168             return this;
169         }
170
171         public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
172             this.timeoutMs = timeoutMs;
173             return this;
174         }
175
176         public MRConsumerImplBuilder setLimit(int limit) {
177             this.limit = limit;
178             return this;
179         }
180
181         public MRConsumerImplBuilder setFilter(String filter) {
182             this.filter = filter;
183             return this;
184         }
185
186         public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
187             this.apiKey_username = apiKey_username;
188             return this;
189         }
190
191         public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
192             this.apiSecret_password = apiSecret_password;
193             return this;
194         }
195
196         public MRConsumerImplBuilder setApiKey(String apiKey) {
197             this.apiKey = apiKey;
198             return this;
199         }
200
201         public MRConsumerImplBuilder setApiSecret(String apiSecret) {
202             this.apiSecret = apiSecret;
203             return this;
204         }
205
206         public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
207             this.allowSelfSignedCerts = allowSelfSignedCerts;
208             return this;
209         }
210
211         public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
212             return new MRConsumerImpl(this);
213         }
214     }
215
216     @Override
217     public Iterable<String> fetch() throws IOException, Exception {
218         // fetch with the timeout and limit set in constructor
219         return fetch(fTimeoutMs, fLimit);
220     }
221
222     @Override
223     public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
224         final LinkedList<String> msgs = new LinkedList<>();
225
226         ProtocolType protocolFlagEnum = null;
227         for (ProtocolType type : ProtocolType.values()) {
228             if (type.getValue().equalsIgnoreCase(protocolFlag)) {
229                 protocolFlagEnum = type;
230             }
231         }
232         if (protocolFlagEnum == null) {
233             return msgs;
234         }
235
236         try {
237             switch (protocolFlagEnum) {
238                 case DME2:
239                     dmeConfigure(timeoutMs, limit);
240                     String reply = sender.sendAndWait(timeoutMs + 10000L);
241                     readJsonData(msgs, getResponseDataInJson(reply));
242                     break;
243                 case AAF_AUTH:
244                     String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
245                             fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
246                     final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
247                     readJsonData(msgs, o);
248                     break;
249                 case AUTH_KEY:
250                     final String urlKeyPath = createUrlPath(
251                             MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
252                             timeoutMs, limit);
253                     final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
254                     readJsonData(msgs, authObject);
255                     break;
256                 case HTTPNOAUTH:
257                     final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
258                             fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
259                     readJsonData(msgs, getNoAuth(urlNoAuthPath));
260                     break;
261             }
262         } catch (JSONException e) {
263             // unexpected response
264             reportProblemWithResponse();
265             logger.error(EXECPTION_MESSAGE, e);
266         } catch (HttpException e) {
267             throw new IOException(e);
268         }
269
270         return msgs;
271     }
272
273     private void readJsonData(LinkedList<String> msgs, JSONObject o) {
274         if (o != null) {
275             final JSONArray a = o.getJSONArray(JSON_RESULT);
276             if (a != null) {
277                 for (int i = 0; i < a.length(); i++) {
278                     if (a.get(i) instanceof String) {
279                         msgs.add(a.getString(i));
280                     } else {
281                         msgs.add(a.getJSONObject(i).toString());
282                     }
283                 }
284             }
285         }
286     }
287
288     @Override
289     public MRConsumerResponse fetchWithReturnConsumerResponse() {
290         // fetch with the timeout and limit set in constructor
291         return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
292     }
293
294     @Override
295     public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
296         final LinkedList<String> msgs = new LinkedList<>();
297         MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
298         try {
299             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
300                 dmeConfigure(timeoutMs, limit);
301
302                 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
303                         : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
304                 String reply = sender.sendAndWait(timeout);
305
306                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
307
308                 readJsonData(msgs, o);
309                 createMRConsumerResponse(reply, mrConsumerResponse);
310             }
311
312             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
313                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
314                         fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
315
316                 String response = getResponse(urlPath, username, password, protocolFlag);
317                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
318                 readJsonData(msgs, o);
319                 createMRConsumerResponse(response, mrConsumerResponse);
320             }
321
322             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
323                 final String urlPath = createUrlPath(
324                         MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
325                         timeoutMs, limit);
326
327                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
328                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
329                 readJsonData(msgs, o);
330                 createMRConsumerResponse(response, mrConsumerResponse);
331             }
332
333             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
334                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
335                         fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
336
337                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
338                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
339                 readJsonData(msgs, o);
340                 createMRConsumerResponse(response, mrConsumerResponse);
341             }
342
343         } catch (JSONException e) {
344             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
345             mrConsumerResponse.setResponseMessage(e.getMessage());
346             logger.error("json exception: ", e);
347         } catch (HttpException e) {
348             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
349             mrConsumerResponse.setResponseMessage(e.getMessage());
350             logger.error("http exception: ", e);
351         } catch (DME2Exception e) {
352             mrConsumerResponse.setResponseCode(e.getErrorCode());
353             mrConsumerResponse.setResponseMessage(e.getErrorMessage());
354             logger.error("DME2 exception: ", e);
355         } catch (Exception e) {
356             mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
357             mrConsumerResponse.setResponseMessage(e.getMessage());
358             logger.error(EXECPTION_MESSAGE, e);
359         }
360         mrConsumerResponse.setActualMessages(msgs);
361         return mrConsumerResponse;
362     }
363
364     @Override
365     protected void reportProblemWithResponse() {
366         logger.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
367         super.reportProblemWithResponse();
368         fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
369     }
370
371     private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
372         if (reply.startsWith("{")) {
373             JSONObject jsonObject = new JSONObject(reply);
374             String message = jsonObject.getString("message");
375             int status = jsonObject.getInt("status");
376
377             mrConsumerResponse.setResponseCode(Integer.toString(status));
378
379             if (null != message) {
380                 mrConsumerResponse.setResponseMessage(message);
381             }
382         } else if (reply.startsWith("<")) {
383             mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
384             mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
385         } else {
386             mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
387             mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
388         }
389     }
390
391     private JSONObject getResponseDataInJson(String response) {
392         try {
393             JSONTokener jsonTokener = new JSONTokener(response);
394             JSONObject jsonObject = null;
395             final char firstChar = jsonTokener.next();
396             jsonTokener.back();
397             if ('[' == firstChar) {
398                 JSONArray jsonArray = new JSONArray(jsonTokener);
399                 jsonObject = new JSONObject();
400                 jsonObject.put(JSON_RESULT, jsonArray);
401             } else {
402                 jsonObject = new JSONObject(jsonTokener);
403             }
404
405             return jsonObject;
406         } catch (JSONException excp) {
407             logger.error("DMAAP - Error reading response data.", excp);
408             return null;
409         }
410     }
411
412     private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
413         JSONTokener jsonTokener = new JSONTokener(response);
414         JSONObject jsonObject = null;
415         final char firstChar = jsonTokener.next();
416         jsonTokener.back();
417         if (null != response && response.length() == 0) {
418             return null;
419         }
420
421         if ('[' == firstChar) {
422             JSONArray jsonArray = new JSONArray(jsonTokener);
423             jsonObject = new JSONObject();
424             jsonObject.put(JSON_RESULT, jsonArray);
425         } else if ('{' == firstChar) {
426             return null;
427         } else if ('<' == firstChar) {
428             return null;
429         } else {
430             jsonObject = new JSONObject(jsonTokener);
431         }
432
433         return jsonObject;
434     }
435
436     private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
437         this.longPollingMs = timeoutMs;
438         String latitude = props.getProperty(LATITUDE);
439         String longitude = props.getProperty(LONGITUDE);
440         String version = props.getProperty(VERSION);
441         String serviceName = props.getProperty(SERVICE_NAME);
442         String env = props.getProperty(ENVIRONMENT);
443         String partner = props.getProperty(PARTNER);
444         String routeOffer = props.getProperty(ROUTE_OFFER);
445         String subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic + "/" + fGroup + "/" + fId;
446         String protocol = props.getProperty(PROTOCOL);
447         String methodType = props.getProperty(METHOD_TYPE);
448         String dmeuser = props.getProperty(USERNAME);
449         String dmepassword = props.getProperty(USERNAME);
450         String contenttype = props.getProperty(CONTENT_TYPE);
451         String handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
452
453         /*
454          * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
455          * provided use the routeOffer value for auto failover within a cluster
456          */
457
458         String preferredRouteKey = readRoute("preferredRouteKey");
459         StringBuilder contextUrl = new StringBuilder();
460         if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
461             contextUrl.append(protocol).append("://").append(serviceName).append("?")
462                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
463                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
464                     .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
465                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
466         } else if (partner != null && !partner.isEmpty()) {
467             contextUrl.append(protocol).append("://").append(serviceName).append("?")
468                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
469                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
470                     .append(URL_PARAM_PARTNER).append("=").append(partner);
471         } else if (routeOffer != null && !routeOffer.isEmpty()) {
472             contextUrl.append(protocol).append("://").append(serviceName).append("?")
473                     .append(URL_PARAM_VERSION).append("=").append(version).append("&")
474                     .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
475                     .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
476         }
477
478         if (timeoutMs != -1) {
479             contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
480         }
481         if (limit != -1) {
482             contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
483         }
484
485         // Add filter to DME2 Url
486         if (fFilter != null && fFilter.length() > 0) {
487             contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
488         }
489
490         url = contextUrl.toString();
491
492         DMETimeOuts = new HashMap<>();
493         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
494         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
495         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
496         DMETimeOuts.put("Content-Type", contenttype);
497         System.setProperty("AFT_LATITUDE", latitude);
498         System.setProperty("AFT_LONGITUDE", longitude);
499         System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
500
501         // SSL changes
502         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
503         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
504         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
505         // SSL changes
506
507         long dme2PerEndPointTimeoutMs;
508         try {
509             dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty(DME2_PER_HANDLER_TIMEOUT_MS));
510             // backward compatibility
511             if (dme2PerEndPointTimeoutMs <= 0) {
512                 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
513             }
514         } catch (NumberFormatException nfe) {
515             // backward compatibility
516             dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
517             getLog().debug(
518                     DME2_PER_HANDLER_TIMEOUT_MS + " not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
519         }
520
521         try {
522             dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty(DME2_REPLY_HANDLER_TIMEOUT_MS));
523         } catch (NumberFormatException nfe) {
524             try {
525                 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
526                 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
527                 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
528                 getLog().debug(
529                         "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
530                                 + dme2ReplyHandlerTimeoutMs);
531             } catch (NumberFormatException e) {
532                 // backward compatibility
533                 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
534                 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
535             }
536         }
537         // backward compatibility
538         if (dme2ReplyHandlerTimeoutMs <= 0) {
539             dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
540         }
541
542         sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
543         sender.setAllowAllHttpReturnCodes(true);
544         sender.setMethod(methodType);
545         sender.setSubContext(subContextPath);
546         if (dmeuser != null && dmepassword != null) {
547             sender.setCredentials(dmeuser, dmepassword);
548         }
549         sender.setHeaders(DMETimeOuts);
550         sender.setPayload("");
551         if (handlers != null && handlers.equalsIgnoreCase("yes")) {
552             sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
553                     props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
554             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
555             sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
556         } else {
557             sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
558         }
559     }
560
561     protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
562         final StringBuilder contexturl = new StringBuilder(url);
563         final StringBuilder adds = new StringBuilder();
564
565         if (timeoutMs > -1) {
566             adds.append("timeout=").append(timeoutMs);
567         }
568
569         if (limit > -1) {
570             if (adds.length() > 0) {
571                 adds.append("&");
572             }
573             adds.append("limit=").append(limit);
574         }
575
576         if (fFilter != null && fFilter.length() > 0) {
577             try {
578                 if (adds.length() > 0) {
579                     adds.append("&");
580                 }
581                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
582             } catch (UnsupportedEncodingException e) {
583                 logger.error("exception at createUrlPath ()  :  ", e);
584             }
585         }
586
587         if (adds.length() > 0) {
588             contexturl.append("?").append(adds.toString());
589         }
590
591         return contexturl.toString();
592     }
593
594     private String readRoute(String routeKey) {
595         try (InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
596             MRClientFactory.prop.load(input);
597         } catch (Exception ex) {
598             logger.error("Reply Router Error " + ex);
599         }
600         return MRClientFactory.prop.getProperty(routeKey);
601     }
602
603     public static List<String> stringToList(String str) {
604         final LinkedList<String> set = new LinkedList<>();
605         if (str != null) {
606             final String[] parts = str.trim().split(",");
607             for (String part : parts) {
608                 final String trimmed = part.trim();
609                 if (trimmed.length() > 0) {
610                     set.add(trimmed);
611                 }
612             }
613         }
614         return set;
615     }
616
617     public static String getRouterFilePath() {
618         return ROUTER_FILE_PATH;
619     }
620
621     public static void setRouterFilePath(String routerFilePath) {
622         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
623     }
624
625     public String getConsumerFilePath() {
626         return consumerFilePath;
627     }
628
629     public void setConsumerFilePath(String consumerFilePath) {
630         this.consumerFilePath = consumerFilePath;
631     }
632
633     public String getProtocolFlag() {
634         return protocolFlag;
635     }
636
637     public void setProtocolFlag(String protocolFlag) {
638         this.protocolFlag = protocolFlag;
639     }
640
641     public Properties getProps() {
642         return props;
643     }
644
645     public void setProps(Properties props) {
646         this.props = props;
647         setClientConfig(DmaapClientUtil.getClientConfig(props));
648     }
649
650     public String getUsername() {
651         return username;
652     }
653
654     public void setUsername(String username) {
655         this.username = username;
656     }
657
658     public String getPassword() {
659         return password;
660     }
661
662     public void setPassword(String password) {
663         this.password = password;
664     }
665
666     public String getHost() {
667         return host;
668     }
669
670     public void setHost(String host) {
671         this.host = host;
672     }
673
674     public String getAuthKey() {
675         return authKey;
676     }
677
678     public void setAuthKey(String authKey) {
679         this.authKey = authKey;
680     }
681
682     public String getAuthDate() {
683         return authDate;
684     }
685
686     public void setAuthDate(String authDate) {
687         this.authDate = authDate;
688     }
689
690     public String getfFilter() {
691         return fFilter;
692     }
693
694     public void setfFilter(String fFilter) {
695         this.fFilter = fFilter;
696     }
697 }