1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2021 Orange.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
25 package org.onap.dmaap.mr.client.impl;
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.FileInputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.UnsupportedEncodingException;
33 import java.net.MalformedURLException;
35 import java.net.URISyntaxException;
36 import java.net.URLEncoder;
37 import java.util.Collection;
38 import java.util.HashMap;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.Properties;
42 import java.util.concurrent.TimeUnit;
43 import org.apache.http.HttpException;
44 import org.apache.http.HttpStatus;
45 import org.json.JSONArray;
46 import org.json.JSONException;
47 import org.json.JSONObject;
48 import org.json.JSONTokener;
49 import org.onap.dmaap.mr.client.HostSelector;
50 import org.onap.dmaap.mr.client.MRClientFactory;
51 import org.onap.dmaap.mr.client.MRConsumer;
52 import org.onap.dmaap.mr.client.ProtocolType;
53 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
59 private static final Logger logger = LoggerFactory.getLogger(MRConsumerImpl.class);
61 public static final String ROUTER_FILE_PATH = null;
63 public String protocolFlag = ProtocolType.DME2.getValue();
64 public String consumerFilePath;
66 private static final String JSON_RESULT = "result";
68 private static final String EXECPTION_MESSAGE = "exception: ";
69 private static final String SUCCESS_MESSAGE = "Success";
70 private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
71 private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
73 private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
74 private static final String URL_PARAM_PARTNER = "partner";
75 private static final String URL_PARAM_ENV_CONTEXT = "envContext";
76 private static final String URL_PARAM_VERSION = "version";
77 private static final String URL_PARAM_FILTER = "filter";
78 private static final String URL_PARAM_LIMIT = "limit";
79 private static final String URL_PARAM_TIMEOUT = "timeout";
81 private static final String USERNAME = "username";
82 private static final String SERVICE_NAME = "ServiceName";
83 private static final String PARTNER = "Partner";
84 private static final String ROUTE_OFFER = "routeOffer";
85 private static final String PROTOCOL = "Protocol";
86 private static final String METHOD_TYPE = "MethodType";
87 private static final String CONTENT_TYPE = "contenttype";
88 private static final String LATITUDE = "Latitude";
89 private static final String LONGITUDE = "Longitude";
90 private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
91 private static final String VERSION = "Version";
92 private static final String ENVIRONMENT = "Environment";
93 private static final String SUB_CONTEXT_PATH = "SubContextPath";
94 private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
95 private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
96 private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
97 private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
98 private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
99 private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
100 private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
101 private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
102 private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
104 private final String fTopic;
105 private final String fGroup;
106 private final String fId;
107 private final int fTimeoutMs;
108 private final int fLimit;
109 private String fFilter;
110 private String username;
111 private String password;
113 private HostSelector fHostSelector = null;
115 private DME2Client sender;
116 private String authKey;
117 private String authDate;
118 private Properties props;
119 private HashMap<String, String> DMETimeOuts;
120 private long dme2ReplyHandlerTimeoutMs;
121 private long longPollingMs;
123 public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
124 super(builder.hostPart,
125 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
127 fTopic = builder.topic;
128 fGroup = builder.consumerGroup;
129 fId = builder.consumerId;
130 fTimeoutMs = builder.timeoutMs;
131 fLimit = builder.limit;
132 fFilter = builder.filter;
134 fHostSelector = new HostSelector(builder.hostPart);
137 public static class MRConsumerImplBuilder {
138 private Collection<String> hostPart;
139 private String topic;
140 private String consumerGroup;
141 private String consumerId;
142 private int timeoutMs;
144 private String filter;
145 private String apiKey_username;
146 private String apiSecret_password;
147 private String apiKey;
148 private String apiSecret;
149 private boolean allowSelfSignedCerts = false;
151 public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
152 this.hostPart = hostPart;
156 public MRConsumerImplBuilder setTopic(String topic) {
161 public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
162 this.consumerGroup = consumerGroup;
166 public MRConsumerImplBuilder setConsumerId(String consumerId) {
167 this.consumerId = consumerId;
171 public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
172 this.timeoutMs = timeoutMs;
176 public MRConsumerImplBuilder setLimit(int limit) {
181 public MRConsumerImplBuilder setFilter(String filter) {
182 this.filter = filter;
186 public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
187 this.apiKey_username = apiKey_username;
191 public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
192 this.apiSecret_password = apiSecret_password;
196 public MRConsumerImplBuilder setApiKey(String apiKey) {
197 this.apiKey = apiKey;
201 public MRConsumerImplBuilder setApiSecret(String apiSecret) {
202 this.apiSecret = apiSecret;
206 public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
207 this.allowSelfSignedCerts = allowSelfSignedCerts;
211 public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
212 return new MRConsumerImpl(this);
217 public Iterable<String> fetch() throws IOException, Exception {
218 // fetch with the timeout and limit set in constructor
219 return fetch(fTimeoutMs, fLimit);
223 public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
224 final LinkedList<String> msgs = new LinkedList<>();
226 ProtocolType protocolFlagEnum = null;
227 for (ProtocolType type : ProtocolType.values()) {
228 if (type.getValue().equalsIgnoreCase(protocolFlag)) {
229 protocolFlagEnum = type;
232 if (protocolFlagEnum == null) {
237 switch (protocolFlagEnum) {
239 dmeConfigure(timeoutMs, limit);
240 String reply = sender.sendAndWait(timeoutMs + 10000L);
241 readJsonData(msgs, getResponseDataInJson(reply));
244 String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
245 fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
246 final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
247 readJsonData(msgs, o);
250 final String urlKeyPath = createUrlPath(
251 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
253 final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
254 readJsonData(msgs, authObject);
257 final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
258 fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
259 readJsonData(msgs, getNoAuth(urlNoAuthPath));
262 } catch (JSONException e) {
263 // unexpected response
264 reportProblemWithResponse();
265 logger.error(EXECPTION_MESSAGE, e);
266 } catch (HttpException e) {
267 throw new IOException(e);
273 private void readJsonData(LinkedList<String> msgs, JSONObject o) {
275 final JSONArray a = o.getJSONArray(JSON_RESULT);
277 for (int i = 0; i < a.length(); i++) {
278 if (a.get(i) instanceof String) {
279 msgs.add(a.getString(i));
281 msgs.add(a.getJSONObject(i).toString());
289 public MRConsumerResponse fetchWithReturnConsumerResponse() {
290 // fetch with the timeout and limit set in constructor
291 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
295 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
296 final LinkedList<String> msgs = new LinkedList<>();
297 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
299 if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
300 dmeConfigure(timeoutMs, limit);
302 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
303 : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
304 String reply = sender.sendAndWait(timeout);
306 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
308 readJsonData(msgs, o);
309 createMRConsumerResponse(reply, mrConsumerResponse);
312 if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
313 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
314 fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
316 String response = getResponse(urlPath, username, password, protocolFlag);
317 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
318 readJsonData(msgs, o);
319 createMRConsumerResponse(response, mrConsumerResponse);
322 if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
323 final String urlPath = createUrlPath(
324 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROTOCOL)),
327 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
328 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
329 readJsonData(msgs, o);
330 createMRConsumerResponse(response, mrConsumerResponse);
333 if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
334 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
335 fGroup, fId, props.getProperty(PROTOCOL)), timeoutMs, limit);
337 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
338 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
339 readJsonData(msgs, o);
340 createMRConsumerResponse(response, mrConsumerResponse);
343 } catch (JSONException e) {
344 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
345 mrConsumerResponse.setResponseMessage(e.getMessage());
346 logger.error("json exception: ", e);
347 } catch (HttpException e) {
348 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
349 mrConsumerResponse.setResponseMessage(e.getMessage());
350 logger.error("http exception: ", e);
351 } catch (DME2Exception e) {
352 mrConsumerResponse.setResponseCode(e.getErrorCode());
353 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
354 logger.error("DME2 exception: ", e);
355 } catch (Exception e) {
356 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
357 mrConsumerResponse.setResponseMessage(e.getMessage());
358 logger.error(EXECPTION_MESSAGE, e);
360 mrConsumerResponse.setActualMessages(msgs);
361 return mrConsumerResponse;
365 protected void reportProblemWithResponse() {
366 logger.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
367 super.reportProblemWithResponse();
368 fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
371 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
372 if (reply.startsWith("{")) {
373 JSONObject jsonObject = new JSONObject(reply);
374 String message = jsonObject.getString("message");
375 int status = jsonObject.getInt("status");
377 mrConsumerResponse.setResponseCode(Integer.toString(status));
379 if (null != message) {
380 mrConsumerResponse.setResponseMessage(message);
382 } else if (reply.startsWith("<")) {
383 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
384 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
386 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
387 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
391 private JSONObject getResponseDataInJson(String response) {
393 JSONTokener jsonTokener = new JSONTokener(response);
394 JSONObject jsonObject = null;
395 final char firstChar = jsonTokener.next();
397 if ('[' == firstChar) {
398 JSONArray jsonArray = new JSONArray(jsonTokener);
399 jsonObject = new JSONObject();
400 jsonObject.put(JSON_RESULT, jsonArray);
402 jsonObject = new JSONObject(jsonTokener);
406 } catch (JSONException excp) {
407 logger.error("DMAAP - Error reading response data.", excp);
412 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
413 JSONTokener jsonTokener = new JSONTokener(response);
414 JSONObject jsonObject = null;
415 final char firstChar = jsonTokener.next();
417 if (null != response && response.length() == 0) {
421 if ('[' == firstChar) {
422 JSONArray jsonArray = new JSONArray(jsonTokener);
423 jsonObject = new JSONObject();
424 jsonObject.put(JSON_RESULT, jsonArray);
425 } else if ('{' == firstChar) {
427 } else if ('<' == firstChar) {
430 jsonObject = new JSONObject(jsonTokener);
436 private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
437 this.longPollingMs = timeoutMs;
438 String latitude = props.getProperty(LATITUDE);
439 String longitude = props.getProperty(LONGITUDE);
440 String version = props.getProperty(VERSION);
441 String serviceName = props.getProperty(SERVICE_NAME);
442 String env = props.getProperty(ENVIRONMENT);
443 String partner = props.getProperty(PARTNER);
444 String routeOffer = props.getProperty(ROUTE_OFFER);
445 String subContextPath = props.getProperty(SUB_CONTEXT_PATH) + fTopic + "/" + fGroup + "/" + fId;
446 String protocol = props.getProperty(PROTOCOL);
447 String methodType = props.getProperty(METHOD_TYPE);
448 String dmeuser = props.getProperty(USERNAME);
449 String dmepassword = props.getProperty(USERNAME);
450 String contenttype = props.getProperty(CONTENT_TYPE);
451 String handlers = props.getProperty(SESSION_STICKINESS_REQUIRED);
454 * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
455 * provided use the routeOffer value for auto failover within a cluster
458 String preferredRouteKey = readRoute("preferredRouteKey");
459 StringBuilder contextUrl = new StringBuilder();
460 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
461 contextUrl.append(protocol).append("://").append(serviceName).append("?")
462 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
463 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
464 .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
465 .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
466 } else if (partner != null && !partner.isEmpty()) {
467 contextUrl.append(protocol).append("://").append(serviceName).append("?")
468 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
469 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
470 .append(URL_PARAM_PARTNER).append("=").append(partner);
471 } else if (routeOffer != null && !routeOffer.isEmpty()) {
472 contextUrl.append(protocol).append("://").append(serviceName).append("?")
473 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
474 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
475 .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
478 if (timeoutMs != -1) {
479 contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
482 contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
485 // Add filter to DME2 Url
486 if (fFilter != null && fFilter.length() > 0) {
487 contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
490 url = contextUrl.toString();
492 DMETimeOuts = new HashMap<>();
493 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
494 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
495 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
496 DMETimeOuts.put("Content-Type", contenttype);
497 System.setProperty("AFT_LATITUDE", latitude);
498 System.setProperty("AFT_LONGITUDE", longitude);
499 System.setProperty("AFT_ENVIRONMENT", props.getProperty(AFT_ENVIRONMENT));
502 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
503 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
504 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
507 long dme2PerEndPointTimeoutMs;
509 dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty(DME2_PER_HANDLER_TIMEOUT_MS));
510 // backward compatibility
511 if (dme2PerEndPointTimeoutMs <= 0) {
512 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
514 } catch (NumberFormatException nfe) {
515 // backward compatibility
516 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
518 DME2_PER_HANDLER_TIMEOUT_MS + " not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
522 dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty(DME2_REPLY_HANDLER_TIMEOUT_MS));
523 } catch (NumberFormatException nfe) {
525 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_READ_TIMEOUT_MS));
526 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty(AFT_DME2_EP_CONN_TIMEOUT));
527 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
529 "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
530 + dme2ReplyHandlerTimeoutMs);
531 } catch (NumberFormatException e) {
532 // backward compatibility
533 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
534 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
537 // backward compatibility
538 if (dme2ReplyHandlerTimeoutMs <= 0) {
539 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
542 sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
543 sender.setAllowAllHttpReturnCodes(true);
544 sender.setMethod(methodType);
545 sender.setSubContext(subContextPath);
546 if (dmeuser != null && dmepassword != null) {
547 sender.setCredentials(dmeuser, dmepassword);
549 sender.setHeaders(DMETimeOuts);
550 sender.setPayload("");
551 if (handlers != null && handlers.equalsIgnoreCase("yes")) {
552 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
553 props.getProperty(AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
554 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty(AFT_DME2_EXCHANGE_REPLY_HANDLERS));
555 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty(AFT_DME2_REQ_TRACE_ON));
557 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
561 protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
562 final StringBuilder contexturl = new StringBuilder(url);
563 final StringBuilder adds = new StringBuilder();
565 if (timeoutMs > -1) {
566 adds.append("timeout=").append(timeoutMs);
570 if (adds.length() > 0) {
573 adds.append("limit=").append(limit);
576 if (fFilter != null && fFilter.length() > 0) {
578 if (adds.length() > 0) {
581 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
582 } catch (UnsupportedEncodingException e) {
583 logger.error("exception at createUrlPath () : ", e);
587 if (adds.length() > 0) {
588 contexturl.append("?").append(adds.toString());
591 return contexturl.toString();
594 private String readRoute(String routeKey) {
595 try (InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
596 MRClientFactory.prop.load(input);
597 } catch (Exception ex) {
598 logger.error("Reply Router Error " + ex);
600 return MRClientFactory.prop.getProperty(routeKey);
603 public static List<String> stringToList(String str) {
604 final LinkedList<String> set = new LinkedList<>();
606 final String[] parts = str.trim().split(",");
607 for (String part : parts) {
608 final String trimmed = part.trim();
609 if (trimmed.length() > 0) {
617 public static String getRouterFilePath() {
618 return ROUTER_FILE_PATH;
621 public static void setRouterFilePath(String routerFilePath) {
622 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
625 public String getConsumerFilePath() {
626 return consumerFilePath;
629 public void setConsumerFilePath(String consumerFilePath) {
630 this.consumerFilePath = consumerFilePath;
633 public String getProtocolFlag() {
637 public void setProtocolFlag(String protocolFlag) {
638 this.protocolFlag = protocolFlag;
641 public Properties getProps() {
645 public void setProps(Properties props) {
647 setClientConfig(DmaapClientUtil.getClientConfig(props));
650 public String getUsername() {
654 public void setUsername(String username) {
655 this.username = username;
658 public String getPassword() {
662 public void setPassword(String password) {
663 this.password = password;
666 public String getHost() {
670 public void setHost(String host) {
674 public String getAuthKey() {
678 public void setAuthKey(String authKey) {
679 this.authKey = authKey;
682 public String getAuthDate() {
686 public void setAuthDate(String authDate) {
687 this.authDate = authDate;
690 public String getfFilter() {
694 public void setfFilter(String fFilter) {
695 this.fFilter = fFilter;