clean MR codebase
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  * ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *  
23  *******************************************************************************/
24 package org.onap.dmaap.mr.client;
25
26 import java.io.File;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
37
38 import javax.ws.rs.core.MultivaluedMap;
39
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
41 import org.onap.dmaap.mr.client.impl.MRMetaClient;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
44 import org.onap.dmaap.mr.tools.ValidatorUtil;
45
46 /**
47  * A factory for MR clients.<br/>
48  * <br/>
49  * Use caution selecting a consumer creator factory. If the call doesn't accept
50  * a consumer group name, then it creates a consumer that is not restartable.
51  * That is, if you stop your process and start it again, your client will NOT
52  * receive any missed messages on the topic. If you need to ensure receipt of
53  * missed messages, then you must use a consumer that's created with a group
54  * name and ID. (If you create multiple consumer processes using the same group,
55  * load is split across them. Be sure to use a different ID for each
56  * instance.)<br/>
57  * <br/>
58  * Publishers
59  * 
60  * @author author
61  */
62 public class MRClientFactory {
63     private static final String AUTH_KEY = "authKey";
64     private static final String AUTH_DATE = "authDate";
65     private static final String PASSWORD = "password";
66     private static final String USERNAME = "username";
67     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
68     private static final String TOPIC = "topic";
69     private static final String TRANSPORT_TYPE = "TransportType";
70     public static MultivaluedMap<String, Object> HTTPHeadersMap;
71     public static Map<String, String> DME2HeadersMap;
72     public static String routeFilePath;
73
74     public static FileReader routeReader;
75
76     public static FileWriter routeWriter = null;
77     public static Properties prop = null;
78
79     /**
80      * Instantiates MRClientFactory.
81      */
82     private MRClientFactory() {
83         //prevents instantiation.
84     }
85     
86     /**
87      * Create a consumer instance with the default timeout and no limit on
88      * messages returned. This consumer operates as an independent consumer
89      * (i.e., not in a group) and is NOT re-startable across sessions.
90      * 
91      * @param hostList
92      *            A comma separated list of hosts to use to connect to MR. You
93      *            can include port numbers (3904 is the default). For example,
94      *            "hostname:8080,"
95      * 
96      * @param topic
97      *            The topic to consume
98      * 
99      * @return a consumer
100      */
101     public static MRConsumer createConsumer(String hostList, String topic) {
102         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
103     }
104
105     /**
106      * Create a consumer instance with the default timeout and no limit on
107      * messages returned. This consumer operates as an independent consumer
108      * (i.e., not in a group) and is NOT re-startable across sessions.
109      * 
110      * @param hostSet
111      *            The host used in the URL to MR. Entries can be "host:port".
112      * @param topic
113      *            The topic to consume
114      * 
115      * @return a consumer
116      */
117     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
118         return createConsumer(hostSet, topic, null);
119     }
120
121     /**
122      * Create a consumer instance with server-side filtering, the default
123      * timeout, and no limit on messages returned. This consumer operates as an
124      * independent consumer (i.e., not in a group) and is NOT re-startable
125      * across sessions.
126      * 
127      * @param hostSet
128      *            The host used in the URL to MR. Entries can be "host:port".
129      * @param topic
130      *            The topic to consume
131      * @param filter
132      *            a filter to use on the server side
133      * 
134      * @return a consumer
135      */
136     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
137         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
138     }
139
140     /**
141      * Create a consumer instance with the default timeout, and no limit on
142      * messages returned. This consumer can operate in a logical group and is
143      * re-startable across sessions when you use the same group and ID on
144      * restart.
145      * 
146      * @param hostSet
147      *            The host used in the URL to MR. Entries can be "host:port".
148      * @param topic
149      *            The topic to consume
150      * @param consumerGroup
151      *            The name of the consumer group this consumer is part of
152      * @param consumerId
153      *            The unique id of this consume in its group
154      * 
155      * @return a consumer
156      */
157     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
158             final String consumerId) {
159         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
160     }
161
162     /**
163      * Create a consumer instance with the default timeout, and no limit on
164      * messages returned. This consumer can operate in a logical group and is
165      * re-startable across sessions when you use the same group and ID on
166      * restart.
167      * 
168      * @param hostSet
169      *            The host used in the URL to MR. Entries can be "host:port".
170      * @param topic
171      *            The topic to consume
172      * @param consumerGroup
173      *            The name of the consumer group this consumer is part of
174      * @param consumerId
175      *            The unique id of this consume in its group
176      * @param timeoutMs
177      *            The amount of time in milliseconds that the server should keep
178      *            the connection open while waiting for message traffic. Use -1
179      *            for default timeout.
180      * @param limit
181      *            A limit on the number of messages returned in a single call.
182      *            Use -1 for no limit.
183      * 
184      * @return a consumer
185      */
186     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
187             final String consumerId, int timeoutMs, int limit) {
188         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
189     }
190
191     /**
192      * Create a consumer instance with the default timeout, and no limit on
193      * messages returned. This consumer can operate in a logical group and is
194      * re-startable across sessions when you use the same group and ID on
195      * restart. This consumer also uses server-side filtering.
196      * 
197      * @param hostList
198      *            A comma separated list of hosts to use to connect to MR. You
199      *            can include port numbers (3904 is the default)"
200      * @param topic
201      *            The topic to consume
202      * @param consumerGroup
203      *            The name of the consumer group this consumer is part of
204      * @param consumerId
205      *            The unique id of this consume in its group
206      * @param timeoutMs
207      *            The amount of time in milliseconds that the server should keep
208      *            the connection open while waiting for message traffic. Use -1
209      *            for default timeout.
210      * @param limit
211      *            A limit on the number of messages returned in a single call.
212      *            Use -1 for no limit.
213      * @param filter
214      *            A Highland Park filter expression using only built-in filter
215      *            components. Use null for "no filter".
216      * 
217      * @return a consumer
218      */
219     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
220             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
221         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
222                 filter, apiKey, apiSecret);
223     }
224
225     /**
226      * Create a consumer instance with the default timeout, and no limit on
227      * messages returned. This consumer can operate in a logical group and is
228      * re-startable across sessions when you use the same group and ID on
229      * restart. This consumer also uses server-side filtering.
230      * 
231      * @param hostSet
232      *            The host used in the URL to MR. Entries can be "host:port".
233      * @param topic
234      *            The topic to consume
235      * @param consumerGroup
236      *            The name of the consumer group this consumer is part of
237      * @param consumerId
238      *            The unique id of this consume in its group
239      * @param timeoutMs
240      *            The amount of time in milliseconds that the server should keep
241      *            the connection open while waiting for message traffic. Use -1
242      *            for default timeout.
243      * @param limit
244      *            A limit on the number of messages returned in a single call.
245      *            Use -1 for no limit.
246      * @param filter
247      *            A Highland Park filter expression using only built-in filter
248      *            components. Use null for "no filter".
249      * 
250      * @return a consumer
251      */
252     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
253             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
254         if (MRClientBuilders.sfConsumerMock != null)
255             return MRClientBuilders.sfConsumerMock;
256         try {
257             return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
258                     apiSecret);
259         } catch (MalformedURLException e) {
260             throw new IllegalArgumentException(e);
261         }
262     }
263
264     /*************************************************************************/
265     /*************************************************************************/
266     /*************************************************************************/
267
268     /**
269      * Create a publisher that sends each message (or group of messages)
270      * immediately. Most applications should favor higher latency for much
271      * higher message throughput and the "simple publisher" is not a good
272      * choice.
273      * 
274      * @param hostlist
275      *            The host used in the URL to MR. Can be "host:port", can be
276      *            multiple comma-separated entries.
277      * @param topic
278      *            The topic on which to publish messages.
279      * @return a publisher
280      */
281     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
282         return createBatchingPublisher(hostlist, topic, 1, 1);
283     }
284
285     /**
286      * Create a publisher that batches messages. Be sure to close the publisher
287      * to send the last batch and ensure a clean shutdown. Message payloads are
288      * not compressed.
289      * 
290      * @param hostlist
291      *            The host used in the URL to MR. Can be "host:port", can be
292      *            multiple comma-separated entries.
293      * @param topic
294      *            The topic on which to publish messages.
295      * @param maxBatchSize
296      *            The largest set of messages to batch
297      * @param maxAgeMs
298      *            The maximum age of a message waiting in a batch
299      * 
300      * @return a publisher
301      */
302     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
303             long maxAgeMs) {
304         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
305     }
306
307     /**
308      * Create a publisher that batches messages. Be sure to close the publisher
309      * to send the last batch and ensure a clean shutdown.
310      * 
311      * @param hostlist
312      *            The host used in the URL to MR. Can be "host:port", can be
313      *            multiple comma-separated entries.
314      * @param topic
315      *            The topic on which to publish messages.
316      * @param maxBatchSize
317      *            The largest set of messages to batch
318      * @param maxAgeMs
319      *            The maximum age of a message waiting in a batch
320      * @param compress
321      *            use gzip compression
322      * 
323      * @return a publisher
324      */
325     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
326             long maxAgeMs, boolean compress) {
327         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
328     }
329
330     /**
331      * Create a publisher that batches messages. Be sure to close the publisher
332      * to send the last batch and ensure a clean shutdown.
333      * 
334      * @param hostSet
335      *            A set of hosts to be used in the URL to MR. Can be
336      *            "host:port". Use multiple entries to enable failover.
337      * @param topic
338      *            The topic on which to publish messages.
339      * @param maxBatchSize
340      *            The largest set of messages to batch
341      * @param maxAgeMs
342      *            The maximum age of a message waiting in a batch
343      * @param compress
344      *            use gzip compression
345      * 
346      * @return a publisher
347      */
348     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
349             long maxAgeMs, boolean compress) {
350         final TreeSet<String> hosts = new TreeSet<>();
351         for (String hp : hostSet) {
352             hosts.add(hp);
353         }
354         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
355     }
356
357     /**
358      * Create a publisher that batches messages. Be sure to close the publisher
359      * to send the last batch and ensure a clean shutdown.
360      * 
361      * @param hostSet
362      *            A set of hosts to be used in the URL to MR. Can be
363      *            "host:port". Use multiple entries to enable failover.
364      * @param topic
365      *            The topic on which to publish messages.
366      * @param maxBatchSize
367      *            The largest set of messages to batch
368      * @param maxAgeMs
369      *            The maximum age of a message waiting in a batch
370      * @param compress
371      *            use gzip compression
372      * 
373      * @return a publisher
374      */
375     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
376             int maxBatchSize, long maxAgeMs, boolean compress) {
377         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
378                 .compress(compress).build();
379     }
380
381     /**
382      * Create a publisher that batches messages. Be sure to close the publisher
383      * to send the last batch and ensure a clean shutdown.
384      * 
385      * @param host
386      *            A host to be used in the URL to MR. Can be "host:port". Use
387      *            multiple entries to enable failover.
388      * @param topic
389      *            The topic on which to publish messages.
390      * @param username
391      *            username
392      * @param password
393      *            password
394      * @param maxBatchSize
395      *            The largest set of messages to batch
396      * @param maxAgeMs
397      *            The maximum age of a message waiting in a batch
398      * @param compress
399      *            use gzip compression
400      * @param protocolFlag
401      *            http auth or ueb auth or dme2 method
402      * @param producerFilePath
403      *            all properties for publisher
404      * @return MRBatchingPublisher obj
405      */
406     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
407             final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
408         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
409                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
410                 .compress(compress).build();
411
412         pub.setHost(host);
413         pub.setUsername(username);
414         pub.setPassword(password);
415         pub.setProtocolFlag(protocolFlag);
416         return pub;
417     }
418
419     /**
420      * Create a publisher that batches messages. Be sure to close the publisher
421      * to send the last batch and ensure a clean shutdown
422      * 
423      * @param Properties
424      *            props set all properties for publishing message
425      * @return MRBatchingPublisher obj
426      * @throws FileNotFoundException
427      *             exc
428      * @throws IOException
429      *             ioex
430      */
431     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
432             throws FileNotFoundException, IOException {
433         return createInternalBatchingPublisher(props, withResponse);
434     }
435
436     /**
437      * Create a publisher that batches messages. Be sure to close the publisher
438      * to send the last batch and ensure a clean shutdown
439      * 
440      * @param Properties
441      *            props set all properties for publishing message
442      * @return MRBatchingPublisher obj
443      * @throws FileNotFoundException
444      *             exc
445      * @throws IOException
446      *             ioex
447      */
448     public static MRBatchingPublisher createBatchingPublisher(Properties props)
449             throws FileNotFoundException, IOException {
450         return createInternalBatchingPublisher(props, false);
451     }
452
453     /**
454      * Create a publisher that batches messages. Be sure to close the publisher
455      * to send the last batch and ensure a clean shutdown
456      * 
457      * @param producerFilePath
458      *            set all properties for publishing message
459      * @return MRBatchingPublisher obj
460      * @throws FileNotFoundException
461      *             exc
462      * @throws IOException
463      *             ioex
464      */
465     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
466             throws FileNotFoundException, IOException {
467         FileReader reader = new FileReader(new File(producerFilePath));
468         Properties props = new Properties();
469         props.load(reader);
470         return createBatchingPublisher(props);
471     }
472
473     /**
474      * Create a publisher that will contain send methods that return response
475      * object to user.
476      * 
477      * @param producerFilePath
478      *            set all properties for publishing message
479      * @return MRBatchingPublisher obj
480      * @throws FileNotFoundException
481      *             exc
482      * @throws IOException
483      *             ioex
484      */
485     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
486             throws FileNotFoundException, IOException {
487         FileReader reader = new FileReader(new File(producerFilePath));
488         Properties props = new Properties();
489         props.load(reader);
490         return createBatchingPublisher(props, withResponse);
491     }
492
493     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
494             throws FileNotFoundException, IOException {
495         assert props != null;
496         MRSimplerBatchPublisher pub;
497         if (withResponse) {
498             pub = new MRSimplerBatchPublisher.Builder()
499                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
500                     .onTopic(props.getProperty(TOPIC))
501                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
502                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
503                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
504                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
505                     .withResponse(withResponse).build();
506         } else {
507             pub = new MRSimplerBatchPublisher.Builder()
508                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
509                     .onTopic(props.getProperty(TOPIC))
510                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
511                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
512                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
513                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
514         }
515         pub.setHost(props.getProperty("host"));
516         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
517
518             pub.setAuthKey(props.getProperty(AUTH_KEY));
519             pub.setAuthDate(props.getProperty(AUTH_DATE));
520             pub.setUsername(props.getProperty(USERNAME));
521             pub.setPassword(props.getProperty(PASSWORD));
522         } else {
523             pub.setUsername(props.getProperty(USERNAME));
524             pub.setPassword(props.getProperty(PASSWORD));
525         }
526         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
527         pub.setProps(props);
528         prop = new Properties();
529         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
530             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
531             routeReader = new FileReader(new File(routeFilePath));
532             File fo = new File(routeFilePath);
533             if (!fo.exists()) {
534                 routeWriter = new FileWriter(new File(routeFilePath));
535             }
536         }
537         return pub;
538     }
539
540     /**
541      * Create an identity manager client to work with API keys.
542      * 
543      * @param hostSet
544      *            A set of hosts to be used in the URL to MR. Can be
545      *            "host:port". Use multiple entries to enable failover.
546      * @param apiKey
547      *            Your API key
548      * @param apiSecret
549      *            Your API secret
550      * @return an identity manager
551      */
552     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
553         MRIdentityManager cim;
554         try {
555             cim = new MRMetaClient(hostSet);
556         } catch (MalformedURLException e) {
557             throw new IllegalArgumentException(e);
558         }
559         cim.setApiCredentials(apiKey, apiSecret);
560         return cim;
561     }
562
563     /**
564      * Create a topic manager for working with topics.
565      * 
566      * @param hostSet
567      *            A set of hosts to be used in the URL to MR. Can be
568      *            "host:port". Use multiple entries to enable failover.
569      * @param apiKey
570      *            Your API key
571      * @param apiSecret
572      *            Your API secret
573      * @return a topic manager
574      */
575     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
576         MRMetaClient tmi;
577         try {
578             tmi = new MRMetaClient(hostSet);
579         } catch (MalformedURLException e) {
580             throw new IllegalArgumentException(e);
581         }
582         tmi.setApiCredentials(apiKey, apiSecret);
583         return tmi;
584     }
585
586     /**
587      * Inject a consumer. Used to support unit tests.
588      * 
589      * @param cc
590      */
591     public static void $testInject(MRConsumer cc) {
592         MRClientBuilders.sfConsumerMock = cc;
593     }
594
595     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
596             String id, int i, int j, String protocalFlag, String consumerFilePath) {
597
598         MRConsumerImpl sub;
599         try {
600             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
601         } catch (MalformedURLException e) {
602             throw new IllegalArgumentException(e);
603         }
604         sub.setUsername(username);
605         sub.setPassword(password);
606         sub.setHost(host);
607         sub.setProtocolFlag(protocalFlag);
608         sub.setConsumerFilePath(consumerFilePath);
609         return sub;
610
611     }
612
613     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
614             String id, String protocalFlag, String consumerFilePath, int i, int j) {
615
616         MRConsumerImpl sub;
617         try {
618             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
619         } catch (MalformedURLException e) {
620             throw new IllegalArgumentException(e);
621         }
622         sub.setUsername(username);
623         sub.setPassword(password);
624         sub.setHost(host);
625         sub.setProtocolFlag(protocalFlag);
626         sub.setConsumerFilePath(consumerFilePath);
627         return sub;
628
629     }
630
631     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
632         FileReader reader = new FileReader(new File(consumerFilePath));
633         Properties props = new Properties();
634         props.load(reader);
635
636         return createConsumer(props);
637     }
638
639     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
640         int timeout;
641         ValidatorUtil.validateSubscriber(props);
642         if (props.getProperty("timeout") != null)
643             timeout = Integer.parseInt(props.getProperty("timeout"));
644         else
645             timeout = -1;
646         int limit;
647         if (props.getProperty("limit") != null)
648             limit = Integer.parseInt(props.getProperty("limit"));
649         else
650             limit = -1;
651         String group;
652         if (props.getProperty("group") == null)
653             group = UUID.randomUUID().toString();
654         else
655             group = props.getProperty("group");
656         MRConsumerImpl sub = null;
657         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
658             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
659                     group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
660                     props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
661             sub.setAuthKey(props.getProperty(AUTH_KEY));
662             sub.setAuthDate(props.getProperty(AUTH_DATE));
663             sub.setUsername(props.getProperty(USERNAME));
664             sub.setPassword(props.getProperty(PASSWORD));
665         } else {
666             sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
667                     group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
668                     props.getProperty(USERNAME), props.getProperty(PASSWORD));
669             sub.setUsername(props.getProperty(USERNAME));
670             sub.setPassword(props.getProperty(PASSWORD));
671         }
672         
673         sub.setProps(props);
674         sub.setHost(props.getProperty("host"));
675         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
676         sub.setfFilter(props.getProperty("filter"));
677         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
678             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
679             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
680             routeReader = new FileReader(new File(routeFilePath));
681             prop = new Properties();
682             File fo = new File(routeFilePath);
683                 if (!fo.exists()) {
684                     routeWriter = new FileWriter(new File(routeFilePath));
685                 }
686         }
687         
688         return sub;
689     }
690 }