Sonar majior issues
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  * ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *  
23  *******************************************************************************/
24 package org.onap.dmaap.mr.client;
25
26 import java.io.File;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
37
38 import javax.ws.rs.core.MultivaluedMap;
39
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
41 import org.onap.dmaap.mr.client.impl.MRMetaClient;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
44 import org.onap.dmaap.mr.tools.ValidatorUtil;
45
46 /**
47  * A factory for MR clients.<br/>
48  * <br/>
49  * Use caution selecting a consumer creator factory. If the call doesn't accept
50  * a consumer group name, then it creates a consumer that is not restartable.
51  * That is, if you stop your process and start it again, your client will NOT
52  * receive any missed messages on the topic. If you need to ensure receipt of
53  * missed messages, then you must use a consumer that's created with a group
54  * name and ID. (If you create multiple consumer processes using the same group,
55  * load is split across them. Be sure to use a different ID for each
56  * instance.)<br/>
57  * <br/>
58  * Publishers
59  * 
60  * @author author
61  */
62 public class MRClientFactory {
63     private static final String AUTH_KEY = "authKey";
64     private static final String AUTH_DATE = "authDate";
65     private static final String PASSWORD = "password";
66     private static final String USERNAME = "username";
67     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
68     private static final String TOPIC = "topic";
69     private static final String TRANSPORT_TYPE = "TransportType";
70     public static MultivaluedMap<String, Object> HTTPHeadersMap;
71     public static Map<String, String> DME2HeadersMap;
72     public static String routeFilePath;
73
74     public static FileReader routeReader;
75
76     public static FileWriter routeWriter = null;
77     public static Properties prop = null;
78
79     /**
80      * Instantiates MRClientFactory.
81      */
82     private MRClientFactory() {
83         //prevents instantiation.
84     }
85     
86     /**
87      * Create a consumer instance with the default timeout and no limit on
88      * messages returned. This consumer operates as an independent consumer
89      * (i.e., not in a group) and is NOT re-startable across sessions.
90      * 
91      * @param hostList
92      *            A comma separated list of hosts to use to connect to MR. You
93      *            can include port numbers (3904 is the default). For example,
94      *            "hostname:8080,"
95      * 
96      * @param topic
97      *            The topic to consume
98      * 
99      * @return a consumer
100      */
101     public static MRConsumer createConsumer(String hostList, String topic) {
102         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
103     }
104
105     /**
106      * Create a consumer instance with the default timeout and no limit on
107      * messages returned. This consumer operates as an independent consumer
108      * (i.e., not in a group) and is NOT re-startable across sessions.
109      * 
110      * @param hostSet
111      *            The host used in the URL to MR. Entries can be "host:port".
112      * @param topic
113      *            The topic to consume
114      * 
115      * @return a consumer
116      */
117     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
118         return createConsumer(hostSet, topic, null);
119     }
120
121     /**
122      * Create a consumer instance with server-side filtering, the default
123      * timeout, and no limit on messages returned. This consumer operates as an
124      * independent consumer (i.e., not in a group) and is NOT re-startable
125      * across sessions.
126      * 
127      * @param hostSet
128      *            The host used in the URL to MR. Entries can be "host:port".
129      * @param topic
130      *            The topic to consume
131      * @param filter
132      *            a filter to use on the server side
133      * 
134      * @return a consumer
135      */
136     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
137         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
138     }
139
140     /**
141      * Create a consumer instance with the default timeout, and no limit on
142      * messages returned. This consumer can operate in a logical group and is
143      * re-startable across sessions when you use the same group and ID on
144      * restart.
145      * 
146      * @param hostSet
147      *            The host used in the URL to MR. Entries can be "host:port".
148      * @param topic
149      *            The topic to consume
150      * @param consumerGroup
151      *            The name of the consumer group this consumer is part of
152      * @param consumerId
153      *            The unique id of this consume in its group
154      * 
155      * @return a consumer
156      */
157     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
158             final String consumerId) {
159         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
160     }
161
162     /**
163      * Create a consumer instance with the default timeout, and no limit on
164      * messages returned. This consumer can operate in a logical group and is
165      * re-startable across sessions when you use the same group and ID on
166      * restart.
167      * 
168      * @param hostSet
169      *            The host used in the URL to MR. Entries can be "host:port".
170      * @param topic
171      *            The topic to consume
172      * @param consumerGroup
173      *            The name of the consumer group this consumer is part of
174      * @param consumerId
175      *            The unique id of this consume in its group
176      * @param timeoutMs
177      *            The amount of time in milliseconds that the server should keep
178      *            the connection open while waiting for message traffic. Use -1
179      *            for default timeout.
180      * @param limit
181      *            A limit on the number of messages returned in a single call.
182      *            Use -1 for no limit.
183      * 
184      * @return a consumer
185      */
186     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
187             final String consumerId, int timeoutMs, int limit) {
188         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
189     }
190
191     /**
192      * Create a consumer instance with the default timeout, and no limit on
193      * messages returned. This consumer can operate in a logical group and is
194      * re-startable across sessions when you use the same group and ID on
195      * restart. This consumer also uses server-side filtering.
196      * 
197      * @param hostList
198      *            A comma separated list of hosts to use to connect to MR. You
199      *            can include port numbers (3904 is the default). For example,
200      *            "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
201      * @param topic
202      *            The topic to consume
203      * @param consumerGroup
204      *            The name of the consumer group this consumer is part of
205      * @param consumerId
206      *            The unique id of this consume in its group
207      * @param timeoutMs
208      *            The amount of time in milliseconds that the server should keep
209      *            the connection open while waiting for message traffic. Use -1
210      *            for default timeout.
211      * @param limit
212      *            A limit on the number of messages returned in a single call.
213      *            Use -1 for no limit.
214      * @param filter
215      *            A Highland Park filter expression using only built-in filter
216      *            components. Use null for "no filter".
217      * 
218      * @return a consumer
219      */
220     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
221             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
222         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
223                 filter, apiKey, apiSecret);
224     }
225
226     /**
227      * Create a consumer instance with the default timeout, and no limit on
228      * messages returned. This consumer can operate in a logical group and is
229      * re-startable across sessions when you use the same group and ID on
230      * restart. This consumer also uses server-side filtering.
231      * 
232      * @param hostSet
233      *            The host used in the URL to MR. Entries can be "host:port".
234      * @param topic
235      *            The topic to consume
236      * @param consumerGroup
237      *            The name of the consumer group this consumer is part of
238      * @param consumerId
239      *            The unique id of this consume in its group
240      * @param timeoutMs
241      *            The amount of time in milliseconds that the server should keep
242      *            the connection open while waiting for message traffic. Use -1
243      *            for default timeout.
244      * @param limit
245      *            A limit on the number of messages returned in a single call.
246      *            Use -1 for no limit.
247      * @param filter
248      *            A Highland Park filter expression using only built-in filter
249      *            components. Use null for "no filter".
250      * 
251      * @return a consumer
252      */
253     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
254             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
255         if (MRClientBuilders.sfConsumerMock != null)
256             return MRClientBuilders.sfConsumerMock;
257         try {
258             return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
259                     apiSecret);
260         } catch (MalformedURLException e) {
261             throw new IllegalArgumentException(e);
262         }
263     }
264
265     /*************************************************************************/
266     /*************************************************************************/
267     /*************************************************************************/
268
269     /**
270      * Create a publisher that sends each message (or group of messages)
271      * immediately. Most applications should favor higher latency for much
272      * higher message throughput and the "simple publisher" is not a good
273      * choice.
274      * 
275      * @param hostlist
276      *            The host used in the URL to MR. Can be "host:port", can be
277      *            multiple comma-separated entries.
278      * @param topic
279      *            The topic on which to publish messages.
280      * @return a publisher
281      */
282     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
283         return createBatchingPublisher(hostlist, topic, 1, 1);
284     }
285
286     /**
287      * Create a publisher that batches messages. Be sure to close the publisher
288      * to send the last batch and ensure a clean shutdown. Message payloads are
289      * not compressed.
290      * 
291      * @param hostlist
292      *            The host used in the URL to MR. Can be "host:port", can be
293      *            multiple comma-separated entries.
294      * @param topic
295      *            The topic on which to publish messages.
296      * @param maxBatchSize
297      *            The largest set of messages to batch
298      * @param maxAgeMs
299      *            The maximum age of a message waiting in a batch
300      * 
301      * @return a publisher
302      */
303     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
304             long maxAgeMs) {
305         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
306     }
307
308     /**
309      * Create a publisher that batches messages. Be sure to close the publisher
310      * to send the last batch and ensure a clean shutdown.
311      * 
312      * @param hostlist
313      *            The host used in the URL to MR. Can be "host:port", can be
314      *            multiple comma-separated entries.
315      * @param topic
316      *            The topic on which to publish messages.
317      * @param maxBatchSize
318      *            The largest set of messages to batch
319      * @param maxAgeMs
320      *            The maximum age of a message waiting in a batch
321      * @param compress
322      *            use gzip compression
323      * 
324      * @return a publisher
325      */
326     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
327             long maxAgeMs, boolean compress) {
328         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
329     }
330
331     /**
332      * Create a publisher that batches messages. Be sure to close the publisher
333      * to send the last batch and ensure a clean shutdown.
334      * 
335      * @param hostSet
336      *            A set of hosts to be used in the URL to MR. Can be
337      *            "host:port". Use multiple entries to enable failover.
338      * @param topic
339      *            The topic on which to publish messages.
340      * @param maxBatchSize
341      *            The largest set of messages to batch
342      * @param maxAgeMs
343      *            The maximum age of a message waiting in a batch
344      * @param compress
345      *            use gzip compression
346      * 
347      * @return a publisher
348      */
349     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
350             long maxAgeMs, boolean compress) {
351         final TreeSet<String> hosts = new TreeSet<>();
352         for (String hp : hostSet) {
353             hosts.add(hp);
354         }
355         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
356     }
357
358     /**
359      * Create a publisher that batches messages. Be sure to close the publisher
360      * to send the last batch and ensure a clean shutdown.
361      * 
362      * @param hostSet
363      *            A set of hosts to be used in the URL to MR. Can be
364      *            "host:port". Use multiple entries to enable failover.
365      * @param topic
366      *            The topic on which to publish messages.
367      * @param maxBatchSize
368      *            The largest set of messages to batch
369      * @param maxAgeMs
370      *            The maximum age of a message waiting in a batch
371      * @param compress
372      *            use gzip compression
373      * 
374      * @return a publisher
375      */
376     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
377             int maxBatchSize, long maxAgeMs, boolean compress) {
378         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
379                 .compress(compress).build();
380     }
381
382     /**
383      * Create a publisher that batches messages. Be sure to close the publisher
384      * to send the last batch and ensure a clean shutdown.
385      * 
386      * @param host
387      *            A host to be used in the URL to MR. Can be "host:port". Use
388      *            multiple entries to enable failover.
389      * @param topic
390      *            The topic on which to publish messages.
391      * @param username
392      *            username
393      * @param password
394      *            password
395      * @param maxBatchSize
396      *            The largest set of messages to batch
397      * @param maxAgeMs
398      *            The maximum age of a message waiting in a batch
399      * @param compress
400      *            use gzip compression
401      * @param protocolFlag
402      *            http auth or ueb auth or dme2 method
403      * @param producerFilePath
404      *            all properties for publisher
405      * @return MRBatchingPublisher obj
406      */
407     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
408             final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
409         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
410                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
411                 .compress(compress).build();
412
413         pub.setHost(host);
414         pub.setUsername(username);
415         pub.setPassword(password);
416         pub.setProtocolFlag(protocolFlag);
417         return pub;
418     }
419
420     /**
421      * Create a publisher that batches messages. Be sure to close the publisher
422      * to send the last batch and ensure a clean shutdown
423      * 
424      * @param Properties
425      *            props set all properties for publishing message
426      * @return MRBatchingPublisher obj
427      * @throws FileNotFoundException
428      *             exc
429      * @throws IOException
430      *             ioex
431      */
432     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
433             throws FileNotFoundException, IOException {
434         return createInternalBatchingPublisher(props, withResponse);
435     }
436
437     /**
438      * Create a publisher that batches messages. Be sure to close the publisher
439      * to send the last batch and ensure a clean shutdown
440      * 
441      * @param Properties
442      *            props set all properties for publishing message
443      * @return MRBatchingPublisher obj
444      * @throws FileNotFoundException
445      *             exc
446      * @throws IOException
447      *             ioex
448      */
449     public static MRBatchingPublisher createBatchingPublisher(Properties props)
450             throws FileNotFoundException, IOException {
451         return createInternalBatchingPublisher(props, false);
452     }
453
454     /**
455      * Create a publisher that batches messages. Be sure to close the publisher
456      * to send the last batch and ensure a clean shutdown
457      * 
458      * @param producerFilePath
459      *            set all properties for publishing message
460      * @return MRBatchingPublisher obj
461      * @throws FileNotFoundException
462      *             exc
463      * @throws IOException
464      *             ioex
465      */
466     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
467             throws FileNotFoundException, IOException {
468         FileReader reader = new FileReader(new File(producerFilePath));
469         Properties props = new Properties();
470         props.load(reader);
471         return createBatchingPublisher(props);
472     }
473
474     /**
475      * Create a publisher that will contain send methods that return response
476      * object to user.
477      * 
478      * @param producerFilePath
479      *            set all properties for publishing message
480      * @return MRBatchingPublisher obj
481      * @throws FileNotFoundException
482      *             exc
483      * @throws IOException
484      *             ioex
485      */
486     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
487             throws FileNotFoundException, IOException {
488         FileReader reader = new FileReader(new File(producerFilePath));
489         Properties props = new Properties();
490         props.load(reader);
491         return createBatchingPublisher(props, withResponse);
492     }
493
494     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
495             throws FileNotFoundException, IOException {
496         assert props != null;
497         MRSimplerBatchPublisher pub;
498         if (withResponse) {
499             pub = new MRSimplerBatchPublisher.Builder()
500                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
501                     .onTopic(props.getProperty(TOPIC))
502                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
503                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
504                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
505                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
506                     .withResponse(withResponse).build();
507         } else {
508             pub = new MRSimplerBatchPublisher.Builder()
509                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
510                     .onTopic(props.getProperty(TOPIC))
511                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
512                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
513                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
514                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
515         }
516         pub.setHost(props.getProperty("host"));
517         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
518
519             pub.setAuthKey(props.getProperty(AUTH_KEY));
520             pub.setAuthDate(props.getProperty(AUTH_DATE));
521             pub.setUsername(props.getProperty(USERNAME));
522             pub.setPassword(props.getProperty(PASSWORD));
523         } else {
524             pub.setUsername(props.getProperty(USERNAME));
525             pub.setPassword(props.getProperty(PASSWORD));
526         }
527         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
528         pub.setProps(props);
529         prop = new Properties();
530         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
531             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
532             routeReader = new FileReader(new File(routeFilePath));
533             File fo = new File(routeFilePath);
534             if (!fo.exists()) {
535                 routeWriter = new FileWriter(new File(routeFilePath));
536             }
537         }
538         return pub;
539     }
540
541     /**
542      * Create an identity manager client to work with API keys.
543      * 
544      * @param hostSet
545      *            A set of hosts to be used in the URL to MR. Can be
546      *            "host:port". Use multiple entries to enable failover.
547      * @param apiKey
548      *            Your API key
549      * @param apiSecret
550      *            Your API secret
551      * @return an identity manager
552      */
553     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
554         MRIdentityManager cim;
555         try {
556             cim = new MRMetaClient(hostSet);
557         } catch (MalformedURLException e) {
558             throw new IllegalArgumentException(e);
559         }
560         cim.setApiCredentials(apiKey, apiSecret);
561         return cim;
562     }
563
564     /**
565      * Create a topic manager for working with topics.
566      * 
567      * @param hostSet
568      *            A set of hosts to be used in the URL to MR. Can be
569      *            "host:port". Use multiple entries to enable failover.
570      * @param apiKey
571      *            Your API key
572      * @param apiSecret
573      *            Your API secret
574      * @return a topic manager
575      */
576     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
577         MRMetaClient tmi;
578         try {
579             tmi = new MRMetaClient(hostSet);
580         } catch (MalformedURLException e) {
581             throw new IllegalArgumentException(e);
582         }
583         tmi.setApiCredentials(apiKey, apiSecret);
584         return tmi;
585     }
586
587     /**
588      * Inject a consumer. Used to support unit tests.
589      * 
590      * @param cc
591      */
592     public static void $testInject(MRConsumer cc) {
593         MRClientBuilders.sfConsumerMock = cc;
594     }
595
596     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
597             String id, int i, int j, String protocalFlag, String consumerFilePath) {
598
599         MRConsumerImpl sub;
600         try {
601             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
602         } catch (MalformedURLException e) {
603             throw new IllegalArgumentException(e);
604         }
605         sub.setUsername(username);
606         sub.setPassword(password);
607         sub.setHost(host);
608         sub.setProtocolFlag(protocalFlag);
609         sub.setConsumerFilePath(consumerFilePath);
610         return sub;
611
612     }
613
614     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
615             String id, String protocalFlag, String consumerFilePath, int i, int j) {
616
617         MRConsumerImpl sub;
618         try {
619             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
620         } catch (MalformedURLException e) {
621             throw new IllegalArgumentException(e);
622         }
623         sub.setUsername(username);
624         sub.setPassword(password);
625         sub.setHost(host);
626         sub.setProtocolFlag(protocalFlag);
627         sub.setConsumerFilePath(consumerFilePath);
628         return sub;
629
630     }
631
632     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
633         FileReader reader = new FileReader(new File(consumerFilePath));
634         Properties props = new Properties();
635         props.load(reader);
636
637         return createConsumer(props);
638     }
639
640     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
641         int timeout;
642         ValidatorUtil.validateSubscriber(props);
643         if (props.getProperty("timeout") != null)
644             timeout = Integer.parseInt(props.getProperty("timeout"));
645         else
646             timeout = -1;
647         int limit;
648         if (props.getProperty("limit") != null)
649             limit = Integer.parseInt(props.getProperty("limit"));
650         else
651             limit = -1;
652         String group;
653         if (props.getProperty("group") == null)
654             group = UUID.randomUUID().toString();
655         else
656             group = props.getProperty("group");
657         MRConsumerImpl sub = null;
658         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
659             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
660                     group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
661                     props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
662             sub.setAuthKey(props.getProperty(AUTH_KEY));
663             sub.setAuthDate(props.getProperty(AUTH_DATE));
664             sub.setUsername(props.getProperty(USERNAME));
665             sub.setPassword(props.getProperty(PASSWORD));
666         } else {
667             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
668                     group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
669                     props.getProperty(USERNAME), props.getProperty(PASSWORD));
670             sub.setUsername(props.getProperty(USERNAME));
671             sub.setPassword(props.getProperty(PASSWORD));
672         }
673         
674         sub.setProps(props);
675         sub.setHost(props.getProperty("host"));
676         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
677         sub.setfFilter(props.getProperty("filter"));
678         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
679             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
680             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
681             routeReader = new FileReader(new File(routeFilePath));
682             prop = new Properties();
683             File fo = new File(routeFilePath);
684                 if (!fo.exists()) {
685                     routeWriter = new FileWriter(new File(routeFilePath));
686                 }
687         }
688         
689         return sub;
690     }
691 }