1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * Modifications Copyright © 2021 Orange.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
26 package org.onap.dmaap.mr.client;
29 import java.io.FileInputStream;
30 import java.io.FileNotFoundException;
31 import java.io.FileReader;
32 import java.io.FileWriter;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.MalformedURLException;
36 import java.util.Collection;
37 import java.util.Collections;
39 import java.util.Properties;
40 import java.util.TreeSet;
41 import java.util.UUID;
42 import javax.ws.rs.core.MultivaluedMap;
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.impl.MRMetaClient;
46 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
47 import org.onap.dmaap.mr.tools.ValidatorUtil;
50 * A factory for MR clients.<br/>
52 * Use caution selecting a consumer creator factory. If the call doesn't accept
53 * a consumer group name, then it creates a consumer that is not restartable.
54 * That is, if you stop your process and start it again, your client will NOT
55 * receive any missed messages on the topic. If you need to ensure receipt of
56 * missed messages, then you must use a consumer that's created with a group
57 * name and ID. (If you create multiple consumer processes using the same group,
58 * load is split across them. Be sure to use a different ID for each
65 public class MRClientFactory {
67 private static MultivaluedMap<String, Object> httpHeadersMap;
68 public static Map<String, String> DME2HeadersMap;
69 public static String routeFilePath;
71 public static FileReader routeReader;
73 public static FileWriter routeWriter = null;
74 public static Properties prop = null;
77 * Instantiates MRClientFactory.
79 private MRClientFactory() {
80 //prevents instantiation.
84 * Add getter to avoid direct access to static header map.
88 public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
89 return httpHeadersMap;
93 * Add setter to avoid direct access to static header map.
97 public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
98 httpHeadersMap = headers;
102 * Create a consumer instance with the default timeout and no limit on
103 * messages returned. This consumer operates as an independent consumer
104 * (i.e., not in a group) and is NOT re-startable across sessions.
106 * @param hostList A comma separated list of hosts to use to connect to MR. You
107 * can include port numbers (3904 is the default). For example,
109 * @param topic The topic to consume
112 public static MRConsumer createConsumer(String hostList, String topic) {
113 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
117 * Create a consumer instance with the default timeout and no limit on
118 * messages returned. This consumer operates as an independent consumer
119 * (i.e., not in a group) and is NOT re-startable across sessions.
121 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
122 * @param topic The topic to consume
125 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
126 return createConsumer(hostSet, topic, null);
130 * Create a consumer instance with server-side filtering, the default
131 * timeout, and no limit on messages returned. This consumer operates as an
132 * independent consumer (i.e., not in a group) and is NOT re-startable
135 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
136 * @param topic The topic to consume
137 * @param filter a filter to use on the server side
140 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
141 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
145 * Create a consumer instance with the default timeout, and no limit on
146 * messages returned. This consumer can operate in a logical group and is
147 * re-startable across sessions when you use the same group and ID on
150 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
151 * @param topic The topic to consume
152 * @param consumerGroup The name of the consumer group this consumer is part of
153 * @param consumerId The unique id of this consume in its group
156 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
157 final String consumerId) {
158 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
162 * Create a consumer instance with the default timeout, and no limit on
163 * messages returned. This consumer can operate in a logical group and is
164 * re-startable across sessions when you use the same group and ID on
167 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
168 * @param topic The topic to consume
169 * @param consumerGroup The name of the consumer group this consumer is part of
170 * @param consumerId The unique id of this consume in its group
171 * @param timeoutMs The amount of time in milliseconds that the server should keep
172 * the connection open while waiting for message traffic. Use -1
173 * for default timeout.
174 * @param limit A limit on the number of messages returned in a single call.
175 * Use -1 for no limit.
178 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
179 final String consumerId, int timeoutMs, int limit) {
180 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
184 * Create a consumer instance with the default timeout, and no limit on
185 * messages returned. This consumer can operate in a logical group and is
186 * re-startable across sessions when you use the same group and ID on
187 * restart. This consumer also uses server-side filtering.
189 * @param hostList A comma separated list of hosts to use to connect to MR. You
190 * can include port numbers (3904 is the default)"
191 * @param topic The topic to consume
192 * @param consumerGroup The name of the consumer group this consumer is part of
193 * @param consumerId The unique id of this consume in its group
194 * @param timeoutMs The amount of time in milliseconds that the server should keep
195 * the connection open while waiting for message traffic. Use -1
196 * for default timeout.
197 * @param limit A limit on the number of messages returned in a single call.
198 * Use -1 for no limit.
199 * @param filter A Highland Park filter expression using only built-in filter
200 * components. Use null for "no filter".
203 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
204 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
205 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
206 filter, apiKey, apiSecret);
210 * Create a consumer instance with the default timeout, and no limit on
211 * messages returned. This consumer can operate in a logical group and is
212 * re-startable across sessions when you use the same group and ID on
213 * restart. This consumer also uses server-side filtering.
215 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
216 * @param topic The topic to consume
217 * @param consumerGroup The name of the consumer group this consumer is part of
218 * @param consumerId The unique id of this consume in its group
219 * @param timeoutMs The amount of time in milliseconds that the server should keep
220 * the connection open while waiting for message traffic. Use -1
221 * for default timeout.
222 * @param limit A limit on the number of messages returned in a single call.
223 * Use -1 for no limit.
224 * @param filter A Highland Park filter expression using only built-in filter
225 * components. Use null for "no filter".
228 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
229 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
230 if (MRClientBuilders.sfConsumerMock != null) {
231 return MRClientBuilders.sfConsumerMock;
234 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
235 .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
236 .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
237 .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
238 .createMRConsumerImpl();
239 } catch (MalformedURLException e) {
240 throw new IllegalArgumentException(e);
244 //*************************************************************************
245 //*************************************************************************
246 //*************************************************************************
249 * Create a publisher that sends each message (or group of messages)
250 * immediately. Most applications should favor higher latency for much
251 * higher message throughput and the "simple publisher" is not a good
254 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
255 * multiple comma-separated entries.
256 * @param topic The topic on which to publish messages.
257 * @return a publisher
259 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
260 return createBatchingPublisher(hostlist, topic, 1, 1);
264 * Create a publisher that batches messages. Be sure to close the publisher
265 * to send the last batch and ensure a clean shutdown. Message payloads are
268 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
269 * multiple comma-separated entries.
270 * @param topic The topic on which to publish messages.
271 * @param maxBatchSize The largest set of messages to batch
272 * @param maxAgeMs The maximum age of a message waiting in a batch
273 * @return a publisher
275 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
277 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
281 * Create a publisher that batches messages. Be sure to close the publisher
282 * to send the last batch and ensure a clean shutdown.
284 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
285 * multiple comma-separated entries.
286 * @param topic The topic on which to publish messages.
287 * @param maxBatchSize The largest set of messages to batch
288 * @param maxAgeMs The maximum age of a message waiting in a batch
289 * @param compress use gzip compression
290 * @return a publisher
292 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
293 long maxAgeMs, boolean compress) {
294 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
298 * Create a publisher that batches messages. Be sure to close the publisher
299 * to send the last batch and ensure a clean shutdown.
301 * @param hostSet A set of hosts to be used in the URL to MR. Can be
302 * "host:port". Use multiple entries to enable failover.
303 * @param topic The topic on which to publish messages.
304 * @param maxBatchSize The largest set of messages to batch
305 * @param maxAgeMs The maximum age of a message waiting in a batch
306 * @param compress use gzip compression
307 * @return a publisher
309 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
310 long maxAgeMs, boolean compress) {
311 final TreeSet<String> hosts = new TreeSet<>();
312 Collections.addAll(hosts, hostSet);
313 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
317 * Create a publisher that batches messages. Be sure to close the publisher
318 * to send the last batch and ensure a clean shutdown.
320 * @param hostSet A set of hosts to be used in the URL to MR. Can be
321 * "host:port". Use multiple entries to enable failover.
322 * @param topic The topic on which to publish messages.
323 * @param maxBatchSize The largest set of messages to batch
324 * @param maxAgeMs The maximum age of a message waiting in a batch
325 * @param compress use gzip compression
326 * @return a publisher
328 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
329 int maxBatchSize, long maxAgeMs, boolean compress) {
330 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
331 .compress(compress).build();
335 * Create a publisher that batches messages. Be sure to close the publisher
336 * to send the last batch and ensure a clean shutdown.
338 * @param host A host to be used in the URL to MR. Can be "host:port". Use
339 * multiple entries to enable failover.
340 * @param topic The topic on which to publish messages.
341 * @param username username
342 * @param password password
343 * @param maxBatchSize The largest set of messages to batch
344 * @param maxAgeMs The maximum age of a message waiting in a batch
345 * @param compress use gzip compression
346 * @param protocolFlag http auth or ueb auth or dme2 method
347 * @return MRBatchingPublisher obj
349 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
350 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
351 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
352 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
353 .compress(compress).build();
356 pub.setUsername(username);
357 pub.setPassword(password);
358 pub.setProtocolFlag(protocolFlag);
363 * Create a publisher that batches messages. Be sure to close the publisher
364 * to send the last batch and ensure a clean shutdown
366 * @param props props set all properties for publishing message
367 * @return MRBatchingPublisher obj
368 * @throws FileNotFoundException exc
369 * @throws IOException ioex
371 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
372 throws FileNotFoundException, IOException {
373 return createInternalBatchingPublisher(props, withResponse);
377 * Create a publisher that batches messages. Be sure to close the publisher
378 * to send the last batch and ensure a clean shutdown
380 * @param props props set all properties for publishing message
381 * @return MRBatchingPublisher obj
382 * @throws FileNotFoundException exc
383 * @throws IOException ioex
385 public static MRBatchingPublisher createBatchingPublisher(Properties props)
386 throws FileNotFoundException, IOException {
387 return createInternalBatchingPublisher(props, false);
391 * Create a publisher that batches messages. Be sure to close the publisher
392 * to send the last batch and ensure a clean shutdown
394 * @param producerFilePath set all properties for publishing message
395 * @return MRBatchingPublisher obj
396 * @throws FileNotFoundException exc
397 * @throws IOException ioex
399 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
400 throws FileNotFoundException, IOException {
401 Properties props = new Properties();
402 try (InputStream input = new FileInputStream(producerFilePath)) {
405 return createBatchingPublisher(props);
409 * Create a publisher that will contain send methods that return response
412 * @param producerFilePath set all properties for publishing message
413 * @return MRBatchingPublisher obj
414 * @throws FileNotFoundException exc
415 * @throws IOException ioex
417 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
418 throws FileNotFoundException, IOException {
419 Properties props = new Properties();
420 try (InputStream input = new FileInputStream(producerFilePath)) {
423 return createBatchingPublisher(props, withResponse);
426 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
427 throws FileNotFoundException, IOException {
428 assert props != null;
429 MRSimplerBatchPublisher pub;
431 String messageSentThreadOccurrence = props.getProperty(DmaapClientConst.MESSAGE_SENT_THREAD_OCCURRENCE);
432 if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
433 messageSentThreadOccurrence = props.getProperty(DmaapClientConst.MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
437 pub = new MRSimplerBatchPublisher.Builder()
438 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)), MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.SERVICE_NAME)), props.getProperty(DmaapClientConst.TRANSPORT_TYPE))
439 .onTopic(props.getProperty(DmaapClientConst.TOPIC))
440 .batchTo(Integer.parseInt(props.getProperty(DmaapClientConst.MAX_BATCH_SIZE)),
441 Integer.parseInt(props.getProperty(DmaapClientConst.MAX_AGE_MS)))
442 .compress(Boolean.parseBoolean(props.getProperty(DmaapClientConst.COMPRESS)))
443 .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
444 .withResponse(withResponse).build();
446 pub = new MRSimplerBatchPublisher.Builder()
447 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)), MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.SERVICE_NAME)), props.getProperty(DmaapClientConst.TRANSPORT_TYPE))
448 .onTopic(props.getProperty(DmaapClientConst.TOPIC))
449 .batchTo(Integer.parseInt(props.getProperty(DmaapClientConst.MAX_BATCH_SIZE)),
450 Integer.parseInt(props.getProperty(DmaapClientConst.MAX_AGE_MS)))
451 .compress(Boolean.parseBoolean(props.getProperty(DmaapClientConst.COMPRESS)))
452 .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
454 pub.setHost(props.getProperty(DmaapClientConst.HOST));
455 if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
457 pub.setAuthKey(props.getProperty(DmaapClientConst.AUTH_KEY));
458 pub.setAuthDate(props.getProperty(DmaapClientConst.AUTH_DATE));
459 pub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
460 pub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
462 pub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
463 pub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
465 pub.setProtocolFlag(props.getProperty(DmaapClientConst.TRANSPORT_TYPE));
467 prop = new Properties();
468 if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
469 routeFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
470 routeReader = new FileReader(new File(routeFilePath));
471 File fo = new File(routeFilePath);
473 routeWriter = new FileWriter(new File(routeFilePath));
480 * Create an identity manager client to work with API keys.
482 * @param hostSet A set of hosts to be used in the URL to MR. Can be
483 * "host:port". Use multiple entries to enable failover.
484 * @param apiKey Your API key
485 * @param apiSecret Your API secret
486 * @return an identity manager
488 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
489 MRIdentityManager cim;
491 cim = new MRMetaClient(hostSet);
492 } catch (MalformedURLException e) {
493 throw new IllegalArgumentException(e);
495 cim.setApiCredentials(apiKey, apiSecret);
500 * Create a topic manager for working with topics.
502 * @param hostSet A set of hosts to be used in the URL to MR. Can be
503 * "host:port". Use multiple entries to enable failover.
504 * @param apiKey Your API key
505 * @param apiSecret Your API secret
506 * @return a topic manager
508 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
511 tmi = new MRMetaClient(hostSet);
512 } catch (MalformedURLException e) {
513 throw new IllegalArgumentException(e);
515 tmi.setApiCredentials(apiKey, apiSecret);
520 * Inject a consumer. Used to support unit tests.
524 public static void $testInject(MRConsumer cc) {
525 MRClientBuilders.sfConsumerMock = cc;
528 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
529 String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
533 sub = new MRConsumerImpl.MRConsumerImplBuilder()
534 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
535 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
536 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
537 .createMRConsumerImpl();
538 } catch (MalformedURLException e) {
539 throw new IllegalArgumentException(e);
541 sub.setUsername(username);
542 sub.setPassword(password);
544 sub.setProtocolFlag(protocalFlag);
545 sub.setConsumerFilePath(consumerFilePath);
550 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
551 String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
555 sub = new MRConsumerImpl.MRConsumerImplBuilder()
556 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
557 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
558 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
559 .createMRConsumerImpl();
560 } catch (MalformedURLException e) {
561 throw new IllegalArgumentException(e);
563 sub.setUsername(username);
564 sub.setPassword(password);
566 sub.setProtocolFlag(protocalFlag);
567 sub.setConsumerFilePath(consumerFilePath);
572 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
573 Properties props = new Properties();
574 try (InputStream input = new FileInputStream(consumerFilePath)) {
577 return createConsumer(props);
580 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
582 ValidatorUtil.validateSubscriber(props);
583 if (props.getProperty(DmaapClientConst.TIMEOUT) != null) {
584 timeout = Integer.parseInt(props.getProperty(DmaapClientConst.TIMEOUT));
589 if (props.getProperty(DmaapClientConst.LIMIT) != null) {
590 limit = Integer.parseInt(props.getProperty(DmaapClientConst.LIMIT));
595 if (props.getProperty(DmaapClientConst.GROUP) == null) {
596 group = UUID.randomUUID().toString();
598 group = props.getProperty(DmaapClientConst.GROUP);
600 MRConsumerImpl sub = null;
601 if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
602 sub = new MRConsumerImpl.MRConsumerImplBuilder()
603 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)))
604 .setTopic(props.getProperty(DmaapClientConst.TOPIC)).setConsumerGroup(group)
605 .setConsumerId(props.getProperty(DmaapClientConst.ID)).setTimeoutMs(timeout).setLimit(limit)
606 .setFilter(props.getProperty(DmaapClientConst.FILTER))
607 .setApiKey_username(props.getProperty(DmaapClientConst.AUTH_KEY))
608 .setApiSecret_password(props.getProperty(DmaapClientConst.AUTH_DATE)).createMRConsumerImpl();
609 sub.setAuthKey(props.getProperty(DmaapClientConst.AUTH_KEY));
610 sub.setAuthDate(props.getProperty(DmaapClientConst.AUTH_DATE));
611 sub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
612 sub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
614 sub = new MRConsumerImpl.MRConsumerImplBuilder()
615 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)))
616 .setTopic(props.getProperty(DmaapClientConst.TOPIC)).setConsumerGroup(group)
617 .setConsumerId(props.getProperty(DmaapClientConst.ID)).setTimeoutMs(timeout).setLimit(limit)
618 .setFilter(props.getProperty(DmaapClientConst.FILTER))
619 .setApiKey_username(props.getProperty(DmaapClientConst.USERNAME))
620 .setApiSecret_password(props.getProperty(DmaapClientConst.PASSWORD)).createMRConsumerImpl();
621 sub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
622 sub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
626 sub.setHost(props.getProperty(DmaapClientConst.HOST));
627 sub.setProtocolFlag(props.getProperty(DmaapClientConst.TRANSPORT_TYPE));
628 sub.setfFilter(props.getProperty(DmaapClientConst.FILTER));
629 if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
630 MRConsumerImpl.setRouterFilePath(props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH));
631 routeFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
632 routeReader = new FileReader(new File(routeFilePath));
633 prop = new Properties();
634 File fo = new File(routeFilePath);
636 routeWriter = new FileWriter(new File(routeFilePath));