85131290c38e7af33eea85e4194294f57af97f6a
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client;
23
24 import java.io.File;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
35
36 import javax.ws.rs.core.MultivaluedMap;
37
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42
43 /**
44  * A factory for MR clients.<br/>
45  * <br/>
46  * Use caution selecting a consumer creator factory. If the call doesn't accept
47  * a consumer group name, then it creates a consumer that is not restartable.
48  * That is, if you stop your process and start it again, your client will NOT
49  * receive any missed messages on the topic. If you need to ensure receipt of
50  * missed messages, then you must use a consumer that's created with a group
51  * name and ID. (If you create multiple consumer processes using the same group,
52  * load is split across them. Be sure to use a different ID for each
53  * instance.)<br/>
54  * <br/>
55  * Publishers
56  * 
57  * @author author
58  */
59 public class MRClientFactory {
60         public static MultivaluedMap<String, Object> HTTPHeadersMap;
61         public static Map<String, String> DME2HeadersMap;
62         public static String routeFilePath;
63
64         public static FileReader routeReader;
65
66         public static FileWriter routeWriter = null;
67         public static Properties prop = null;
68
69         
70         // props= new Properties();
71         /**
72          * Create a consumer instance with the default timeout and no limit on
73          * messages returned. This consumer operates as an independent consumer
74          * (i.e., not in a group) and is NOT re-startable across sessions.
75          * 
76          * @param hostList
77          *            A comma separated list of hosts to use to connect to MR. You
78          *            can include port numbers (3904 is the default). For example,
79          *            "hostname:8080,"
80          * 
81          * @param topic
82          *            The topic to consume
83          * 
84          * @return a consumer
85          */
86         public static MRConsumer createConsumer(String hostList, String topic) {
87                 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
88         }
89
90         /**
91          * Create a consumer instance with the default timeout and no limit on
92          * messages returned. This consumer operates as an independent consumer
93          * (i.e., not in a group) and is NOT re-startable across sessions.
94          * 
95          * @param hostSet
96          *            The host used in the URL to MR. Entries can be "host:port".
97          * @param topic
98          *            The topic to consume
99          * 
100          * @return a consumer
101          */
102         public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
103                 return createConsumer(hostSet, topic, null);
104         }
105
106         /**
107          * Create a consumer instance with server-side filtering, the default
108          * timeout, and no limit on messages returned. This consumer operates as an
109          * independent consumer (i.e., not in a group) and is NOT re-startable
110          * across sessions.
111          * 
112          * @param hostSet
113          *            The host used in the URL to MR. Entries can be "host:port".
114          * @param topic
115          *            The topic to consume
116          * @param filter
117          *            a filter to use on the server side
118          * 
119          * @return a consumer
120          */
121         public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
122                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
123         }
124
125         /**
126          * Create a consumer instance with the default timeout, and no limit on
127          * messages returned. This consumer can operate in a logical group and is
128          * re-startable across sessions when you use the same group and ID on
129          * restart.
130          * 
131          * @param hostSet
132          *            The host used in the URL to MR. Entries can be "host:port".
133          * @param topic
134          *            The topic to consume
135          * @param consumerGroup
136          *            The name of the consumer group this consumer is part of
137          * @param consumerId
138          *            The unique id of this consume in its group
139          * 
140          * @return a consumer
141          */
142         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
143                         final String consumerId) {
144                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
145         }
146
147         /**
148          * Create a consumer instance with the default timeout, and no limit on
149          * messages returned. This consumer can operate in a logical group and is
150          * re-startable across sessions when you use the same group and ID on
151          * restart.
152          * 
153          * @param hostSet
154          *            The host used in the URL to MR. Entries can be "host:port".
155          * @param topic
156          *            The topic to consume
157          * @param consumerGroup
158          *            The name of the consumer group this consumer is part of
159          * @param consumerId
160          *            The unique id of this consume in its group
161          * @param timeoutMs
162          *            The amount of time in milliseconds that the server should keep
163          *            the connection open while waiting for message traffic. Use -1
164          *            for default timeout.
165          * @param limit
166          *            A limit on the number of messages returned in a single call.
167          *            Use -1 for no limit.
168          * 
169          * @return a consumer
170          */
171         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
172                         final String consumerId, int timeoutMs, int limit) {
173                 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
174         }
175
176         /**
177          * Create a consumer instance with the default timeout, and no limit on
178          * messages returned. This consumer can operate in a logical group and is
179          * re-startable across sessions when you use the same group and ID on
180          * restart. This consumer also uses server-side filtering.
181          * 
182          * @param hostList
183          *            A comma separated list of hosts to use to connect to MR. You
184          *            can include port numbers (3904 is the default). For example,
185          *            "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
186          * @param topic
187          *            The topic to consume
188          * @param consumerGroup
189          *            The name of the consumer group this consumer is part of
190          * @param consumerId
191          *            The unique id of this consume in its group
192          * @param timeoutMs
193          *            The amount of time in milliseconds that the server should keep
194          *            the connection open while waiting for message traffic. Use -1
195          *            for default timeout.
196          * @param limit
197          *            A limit on the number of messages returned in a single call.
198          *            Use -1 for no limit.
199          * @param filter
200          *            A Highland Park filter expression using only built-in filter
201          *            components. Use null for "no filter".
202          * 
203          * @return a consumer
204          */
205         public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
206                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
207                 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
208                                 filter, apiKey, apiSecret);
209         }
210
211         /**
212          * Create a consumer instance with the default timeout, and no limit on
213          * messages returned. This consumer can operate in a logical group and is
214          * re-startable across sessions when you use the same group and ID on
215          * restart. This consumer also uses server-side filtering.
216          * 
217          * @param hostSet
218          *            The host used in the URL to MR. Entries can be "host:port".
219          * @param topic
220          *            The topic to consume
221          * @param consumerGroup
222          *            The name of the consumer group this consumer is part of
223          * @param consumerId
224          *            The unique id of this consume in its group
225          * @param timeoutMs
226          *            The amount of time in milliseconds that the server should keep
227          *            the connection open while waiting for message traffic. Use -1
228          *            for default timeout.
229          * @param limit
230          *            A limit on the number of messages returned in a single call.
231          *            Use -1 for no limit.
232          * @param filter
233          *            A Highland Park filter expression using only built-in filter
234          *            components. Use null for "no filter".
235          * 
236          * @return a consumer
237          */
238         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
239                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
240                 if (MRClientBuilders.sfConsumerMock != null)
241                         return MRClientBuilders.sfConsumerMock;
242                 try {
243                         return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
244                                         apiSecret);
245                 } catch (MalformedURLException e) {
246                         throw new IllegalArgumentException(e);
247                 }
248         }
249
250         /*************************************************************************/
251         /*************************************************************************/
252         /*************************************************************************/
253
254         /**
255          * Create a publisher that sends each message (or group of messages)
256          * immediately. Most applications should favor higher latency for much
257          * higher message throughput and the "simple publisher" is not a good
258          * choice.
259          * 
260          * @param hostlist
261          *            The host used in the URL to MR. Can be "host:port", can be
262          *            multiple comma-separated entries.
263          * @param topic
264          *            The topic on which to publish messages.
265          * @return a publisher
266          */
267         public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
268                 return createBatchingPublisher(hostlist, topic, 1, 1);
269         }
270
271         /**
272          * Create a publisher that batches messages. Be sure to close the publisher
273          * to send the last batch and ensure a clean shutdown. Message payloads are
274          * not compressed.
275          * 
276          * @param hostlist
277          *            The host used in the URL to MR. Can be "host:port", can be
278          *            multiple comma-separated entries.
279          * @param topic
280          *            The topic on which to publish messages.
281          * @param maxBatchSize
282          *            The largest set of messages to batch
283          * @param maxAgeMs
284          *            The maximum age of a message waiting in a batch
285          * 
286          * @return a publisher
287          */
288         public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
289                         long maxAgeMs) {
290                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
291         }
292
293         /**
294          * Create a publisher that batches messages. Be sure to close the publisher
295          * to send the last batch and ensure a clean shutdown.
296          * 
297          * @param hostlist
298          *            The host used in the URL to MR. Can be "host:port", can be
299          *            multiple comma-separated entries.
300          * @param topic
301          *            The topic on which to publish messages.
302          * @param maxBatchSize
303          *            The largest set of messages to batch
304          * @param maxAgeMs
305          *            The maximum age of a message waiting in a batch
306          * @param compress
307          *            use gzip compression
308          * 
309          * @return a publisher
310          */
311         public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
312                         long maxAgeMs, boolean compress) {
313                 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
314         }
315
316         /**
317          * Create a publisher that batches messages. Be sure to close the publisher
318          * to send the last batch and ensure a clean shutdown.
319          * 
320          * @param hostSet
321          *            A set of hosts to be used in the URL to MR. Can be
322          *            "host:port". Use multiple entries to enable failover.
323          * @param topic
324          *            The topic on which to publish messages.
325          * @param maxBatchSize
326          *            The largest set of messages to batch
327          * @param maxAgeMs
328          *            The maximum age of a message waiting in a batch
329          * @param compress
330          *            use gzip compression
331          * 
332          * @return a publisher
333          */
334         public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
335                         long maxAgeMs, boolean compress) {
336                 final TreeSet<String> hosts = new TreeSet<String>();
337                 for (String hp : hostSet) {
338                         hosts.add(hp);
339                 }
340                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
341         }
342
343         /**
344          * Create a publisher that batches messages. Be sure to close the publisher
345          * to send the last batch and ensure a clean shutdown.
346          * 
347          * @param hostSet
348          *            A set of hosts to be used in the URL to MR. Can be
349          *            "host:port". Use multiple entries to enable failover.
350          * @param topic
351          *            The topic on which to publish messages.
352          * @param maxBatchSize
353          *            The largest set of messages to batch
354          * @param maxAgeMs
355          *            The maximum age of a message waiting in a batch
356          * @param compress
357          *            use gzip compression
358          * 
359          * @return a publisher
360          */
361         public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
362                         int maxBatchSize, long maxAgeMs, boolean compress) {
363                 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
364                                 .compress(compress).build();
365         }
366
367         /**
368          * Create a publisher that batches messages. Be sure to close the publisher
369          * to send the last batch and ensure a clean shutdown.
370          * 
371          * @param host
372          *            A host to be used in the URL to MR. Can be "host:port". Use
373          *            multiple entries to enable failover.
374          * @param topic
375          *            The topic on which to publish messages.
376          * @param username
377          *            username
378          * @param password
379          *            password
380          * @param maxBatchSize
381          *            The largest set of messages to batch
382          * @param maxAgeMs
383          *            The maximum age of a message waiting in a batch
384          * @param compress
385          *            use gzip compression
386          * @param protocolFlag
387          *            http auth or ueb auth or dme2 method
388          * @param producerFilePath
389          *            all properties for publisher
390          * @return MRBatchingPublisher obj
391          */
392         public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
393                         final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
394                         String producerFilePath) {
395                 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
396                                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
397                                 .compress(compress).build();
398
399                 pub.setHost(host);
400                 pub.setUsername(username);
401                 pub.setPassword(password);
402                 pub.setProtocolFlag(protocolFlag);
403                 return pub;
404         }
405
406         /**
407          * Create a publisher that batches messages. Be sure to close the publisher
408          * to send the last batch and ensure a clean shutdown
409          * 
410          * @param Properties
411          *            props set all properties for publishing message
412          * @return MRBatchingPublisher obj
413          * @throws FileNotFoundException
414          *             exc
415          * @throws IOException
416          *             ioex
417          */
418         public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
419                         throws FileNotFoundException, IOException {
420                 return createInternalBatchingPublisher(props, withResponse);
421         }
422
423         /**
424          * Create a publisher that batches messages. Be sure to close the publisher
425          * to send the last batch and ensure a clean shutdown
426          * 
427          * @param Properties
428          *            props set all properties for publishing message
429          * @return MRBatchingPublisher obj
430          * @throws FileNotFoundException
431          *             exc
432          * @throws IOException
433          *             ioex
434          */
435         public static MRBatchingPublisher createBatchingPublisher(Properties props)
436                         throws FileNotFoundException, IOException {
437                 return createInternalBatchingPublisher(props, false);
438         }
439
440         /**
441          * Create a publisher that batches messages. Be sure to close the publisher
442          * to send the last batch and ensure a clean shutdown
443          * 
444          * @param producerFilePath
445          *            set all properties for publishing message
446          * @return MRBatchingPublisher obj
447          * @throws FileNotFoundException
448          *             exc
449          * @throws IOException
450          *             ioex
451          */
452         public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
453                         throws FileNotFoundException, IOException {
454                 FileReader reader = new FileReader(new File(producerFilePath));
455                 Properties props = new Properties();
456                 props.load(reader);
457                 return createBatchingPublisher(props);
458         }
459
460         /**
461          * Create a publisher that will contain send methods that return response
462          * object to user.
463          * 
464          * @param producerFilePath
465          *            set all properties for publishing message
466          * @return MRBatchingPublisher obj
467          * @throws FileNotFoundException
468          *             exc
469          * @throws IOException
470          *             ioex
471          */
472         public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
473                         throws FileNotFoundException, IOException {
474                 FileReader reader = new FileReader(new File(producerFilePath));
475                 Properties props = new Properties();
476                 props.load(reader);
477                 return createBatchingPublisher(props, withResponse);
478         }
479
480         protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
481                         throws FileNotFoundException, IOException {
482                 assert props != null;
483                 MRSimplerBatchPublisher pub;
484                 if (withResponse) {
485                         pub = new MRSimplerBatchPublisher.Builder()
486                                         .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
487                                         .onTopic(props.getProperty("topic"))
488                                         .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
489                                                         Integer.parseInt(props.getProperty("maxAgeMs").toString()))
490                                         .compress(Boolean.parseBoolean(props.getProperty("compress")))
491                                         .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
492                                         .withResponse(withResponse).build();
493                 } else {
494                         pub = new MRSimplerBatchPublisher.Builder()
495                                         .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
496                                         .onTopic(props.getProperty("topic"))
497                                         .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
498                                                         Integer.parseInt(props.getProperty("maxAgeMs").toString()))
499                                         .compress(Boolean.parseBoolean(props.getProperty("compress")))
500                                         .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
501                 }
502                 pub.setHost(props.getProperty("host"));
503                 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
504
505                         pub.setAuthKey(props.getProperty("authKey"));
506                         pub.setAuthDate(props.getProperty("authDate"));
507                         pub.setUsername(props.getProperty("username"));
508                         pub.setPassword(props.getProperty("password"));
509                 } else {
510                         pub.setUsername(props.getProperty("username"));
511                         pub.setPassword(props.getProperty("password"));
512                 }
513                 pub.setProtocolFlag(props.getProperty("TransportType"));
514                 pub.setProps(props);
515                 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
516                 routeReader = new FileReader(new File(routeFilePath));
517                 prop = new Properties();
518                 File fo = new File(routeFilePath);
519                 if (!fo.exists()) {
520                         routeWriter = new FileWriter(new File(routeFilePath));
521                 }
522                 return pub;
523         }
524
525         /**
526          * Create an identity manager client to work with API keys.
527          * 
528          * @param hostSet
529          *            A set of hosts to be used in the URL to MR. Can be
530          *            "host:port". Use multiple entries to enable failover.
531          * @param apiKey
532          *            Your API key
533          * @param apiSecret
534          *            Your API secret
535          * @return an identity manager
536          */
537         public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
538                 MRIdentityManager cim;
539                 try {
540                         cim = new MRMetaClient(hostSet);
541                 } catch (MalformedURLException e) {
542                         throw new IllegalArgumentException(e);
543                 }
544                 cim.setApiCredentials(apiKey, apiSecret);
545                 return cim;
546         }
547
548         /**
549          * Create a topic manager for working with topics.
550          * 
551          * @param hostSet
552          *            A set of hosts to be used in the URL to MR. Can be
553          *            "host:port". Use multiple entries to enable failover.
554          * @param apiKey
555          *            Your API key
556          * @param apiSecret
557          *            Your API secret
558          * @return a topic manager
559          */
560         public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
561                 MRMetaClient tmi;
562                 try {
563                         tmi = new MRMetaClient(hostSet);
564                 } catch (MalformedURLException e) {
565                         throw new RuntimeException(e);
566                 }
567                 tmi.setApiCredentials(apiKey, apiSecret);
568                 return tmi;
569         }
570
571         /**
572          * Inject a consumer. Used to support unit tests.
573          * 
574          * @param cc
575          */
576         public static void $testInject(MRConsumer cc) {
577                 MRClientBuilders.sfConsumerMock = cc;
578         }
579
580         public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
581                         String id, int i, int j, String protocalFlag, String consumerFilePath) {
582
583                 MRConsumerImpl sub;
584                 try {
585                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
586                 } catch (MalformedURLException e) {
587                         throw new IllegalArgumentException(e);
588                 }
589                 sub.setUsername(username);
590                 sub.setPassword(password);
591                 sub.setHost(host);
592                 sub.setProtocolFlag(protocalFlag);
593                 sub.setConsumerFilePath(consumerFilePath);
594                 return sub;
595
596         }
597
598         public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
599                         String id, String protocalFlag, String consumerFilePath, int i, int j) {
600
601                 MRConsumerImpl sub;
602                 try {
603                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
604                 } catch (MalformedURLException e) {
605                         throw new RuntimeException(e);
606                 }
607                 sub.setUsername(username);
608                 sub.setPassword(password);
609                 sub.setHost(host);
610                 sub.setProtocolFlag(protocalFlag);
611                 sub.setConsumerFilePath(consumerFilePath);
612                 return sub;
613
614         }
615
616         public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
617                 FileReader reader = new FileReader(new File(consumerFilePath));
618                 Properties props = new Properties();
619                 props.load(reader);
620
621                 return createConsumer(props);
622         }
623
624         public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
625                 int timeout;
626                 if (props.getProperty("timeout") != null)
627                         timeout = Integer.parseInt(props.getProperty("timeout"));
628                 else
629                         timeout = -1;
630                 int limit;
631                 if (props.getProperty("limit") != null)
632                         limit = Integer.parseInt(props.getProperty("limit"));
633                 else
634                         limit = -1;
635                 String group;
636                 if (props.getProperty("group") == null)
637                         group = UUID.randomUUID().toString();
638                 else
639                         group = props.getProperty("group");
640                 MRConsumerImpl sub = null;
641                 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
642                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
643                                         group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
644                                         props.getProperty("authKey"), props.getProperty("authDate"));
645                         sub.setAuthKey(props.getProperty("authKey"));
646                         sub.setAuthDate(props.getProperty("authDate"));
647                         sub.setUsername(props.getProperty("username"));
648                         sub.setPassword(props.getProperty("password"));
649                 } else {
650                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
651                                         group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
652                                         props.getProperty("username"), props.getProperty("password"));
653                         sub.setUsername(props.getProperty("username"));
654                         sub.setPassword(props.getProperty("password"));
655                 }
656                 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
657                 sub.setProps(props);
658                 sub.setHost(props.getProperty("host"));
659                 sub.setProtocolFlag(props.getProperty("TransportType"));
660                 sub.setfFilter(props.getProperty("filter"));
661                 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
662                 routeReader = new FileReader(new File(routeFilePath));
663                 prop = new Properties();
664                 File fo = new File(routeFilePath);
665                 if (!fo.exists()) {
666                         routeWriter = new FileWriter(new File(routeFilePath));
667                 }
668                 return sub;
669         }
670 }