97aa0b897635c9c8bb73db6f939f4904d2fabf78
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client;
23
24 import java.io.File;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
35
36 import javax.ws.rs.core.MultivaluedMap;
37
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.att.nsa.mr.tools.ValidatorUtil;
43
44 /**
45  * A factory for MR clients.<br/>
46  * <br/>
47  * Use caution selecting a consumer creator factory. If the call doesn't accept
48  * a consumer group name, then it creates a consumer that is not restartable.
49  * That is, if you stop your process and start it again, your client will NOT
50  * receive any missed messages on the topic. If you need to ensure receipt of
51  * missed messages, then you must use a consumer that's created with a group
52  * name and ID. (If you create multiple consumer processes using the same group,
53  * load is split across them. Be sure to use a different ID for each
54  * instance.)<br/>
55  * <br/>
56  * Publishers
57  * 
58  * @author author
59  */
60 public class MRClientFactory {
61         public static MultivaluedMap<String, Object> HTTPHeadersMap;
62         public static Map<String, String> DME2HeadersMap;
63         public static String routeFilePath;
64
65         public static FileReader routeReader;
66
67         public static FileWriter routeWriter = null;
68         public static Properties prop = null;
69
70         
71         // props= new Properties();
72         /**
73          * Create a consumer instance with the default timeout and no limit on
74          * messages returned. This consumer operates as an independent consumer
75          * (i.e., not in a group) and is NOT re-startable across sessions.
76          * 
77          * @param hostList
78          *            A comma separated list of hosts to use to connect to MR. You
79          *            can include port numbers (3904 is the default). For example,
80          *            "hostname:8080,"
81          * 
82          * @param topic
83          *            The topic to consume
84          * 
85          * @return a consumer
86          */
87         public static MRConsumer createConsumer(String hostList, String topic) {
88                 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
89         }
90
91         /**
92          * Create a consumer instance with the default timeout and no limit on
93          * messages returned. This consumer operates as an independent consumer
94          * (i.e., not in a group) and is NOT re-startable across sessions.
95          * 
96          * @param hostSet
97          *            The host used in the URL to MR. Entries can be "host:port".
98          * @param topic
99          *            The topic to consume
100          * 
101          * @return a consumer
102          */
103         public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
104                 return createConsumer(hostSet, topic, null);
105         }
106
107         /**
108          * Create a consumer instance with server-side filtering, the default
109          * timeout, and no limit on messages returned. This consumer operates as an
110          * independent consumer (i.e., not in a group) and is NOT re-startable
111          * across sessions.
112          * 
113          * @param hostSet
114          *            The host used in the URL to MR. Entries can be "host:port".
115          * @param topic
116          *            The topic to consume
117          * @param filter
118          *            a filter to use on the server side
119          * 
120          * @return a consumer
121          */
122         public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
123                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
124         }
125
126         /**
127          * Create a consumer instance with the default timeout, and no limit on
128          * messages returned. This consumer can operate in a logical group and is
129          * re-startable across sessions when you use the same group and ID on
130          * restart.
131          * 
132          * @param hostSet
133          *            The host used in the URL to MR. Entries can be "host:port".
134          * @param topic
135          *            The topic to consume
136          * @param consumerGroup
137          *            The name of the consumer group this consumer is part of
138          * @param consumerId
139          *            The unique id of this consume in its group
140          * 
141          * @return a consumer
142          */
143         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
144                         final String consumerId) {
145                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
146         }
147
148         /**
149          * Create a consumer instance with the default timeout, and no limit on
150          * messages returned. This consumer can operate in a logical group and is
151          * re-startable across sessions when you use the same group and ID on
152          * restart.
153          * 
154          * @param hostSet
155          *            The host used in the URL to MR. Entries can be "host:port".
156          * @param topic
157          *            The topic to consume
158          * @param consumerGroup
159          *            The name of the consumer group this consumer is part of
160          * @param consumerId
161          *            The unique id of this consume in its group
162          * @param timeoutMs
163          *            The amount of time in milliseconds that the server should keep
164          *            the connection open while waiting for message traffic. Use -1
165          *            for default timeout.
166          * @param limit
167          *            A limit on the number of messages returned in a single call.
168          *            Use -1 for no limit.
169          * 
170          * @return a consumer
171          */
172         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
173                         final String consumerId, int timeoutMs, int limit) {
174                 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
175         }
176
177         /**
178          * Create a consumer instance with the default timeout, and no limit on
179          * messages returned. This consumer can operate in a logical group and is
180          * re-startable across sessions when you use the same group and ID on
181          * restart. This consumer also uses server-side filtering.
182          * 
183          * @param hostList
184          *            A comma separated list of hosts to use to connect to MR. You
185          *            can include port numbers (3904 is the default). For example,
186          *            "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
187          * @param topic
188          *            The topic to consume
189          * @param consumerGroup
190          *            The name of the consumer group this consumer is part of
191          * @param consumerId
192          *            The unique id of this consume in its group
193          * @param timeoutMs
194          *            The amount of time in milliseconds that the server should keep
195          *            the connection open while waiting for message traffic. Use -1
196          *            for default timeout.
197          * @param limit
198          *            A limit on the number of messages returned in a single call.
199          *            Use -1 for no limit.
200          * @param filter
201          *            A Highland Park filter expression using only built-in filter
202          *            components. Use null for "no filter".
203          * 
204          * @return a consumer
205          */
206         public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
207                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
208                 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
209                                 filter, apiKey, apiSecret);
210         }
211
212         /**
213          * Create a consumer instance with the default timeout, and no limit on
214          * messages returned. This consumer can operate in a logical group and is
215          * re-startable across sessions when you use the same group and ID on
216          * restart. This consumer also uses server-side filtering.
217          * 
218          * @param hostSet
219          *            The host used in the URL to MR. Entries can be "host:port".
220          * @param topic
221          *            The topic to consume
222          * @param consumerGroup
223          *            The name of the consumer group this consumer is part of
224          * @param consumerId
225          *            The unique id of this consume in its group
226          * @param timeoutMs
227          *            The amount of time in milliseconds that the server should keep
228          *            the connection open while waiting for message traffic. Use -1
229          *            for default timeout.
230          * @param limit
231          *            A limit on the number of messages returned in a single call.
232          *            Use -1 for no limit.
233          * @param filter
234          *            A Highland Park filter expression using only built-in filter
235          *            components. Use null for "no filter".
236          * 
237          * @return a consumer
238          */
239         public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
240                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
241                 if (MRClientBuilders.sfConsumerMock != null)
242                         return MRClientBuilders.sfConsumerMock;
243                 try {
244                         return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
245                                         apiSecret);
246                 } catch (MalformedURLException e) {
247                         throw new IllegalArgumentException(e);
248                 }
249         }
250
251         /*************************************************************************/
252         /*************************************************************************/
253         /*************************************************************************/
254
255         /**
256          * Create a publisher that sends each message (or group of messages)
257          * immediately. Most applications should favor higher latency for much
258          * higher message throughput and the "simple publisher" is not a good
259          * choice.
260          * 
261          * @param hostlist
262          *            The host used in the URL to MR. Can be "host:port", can be
263          *            multiple comma-separated entries.
264          * @param topic
265          *            The topic on which to publish messages.
266          * @return a publisher
267          */
268         public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
269                 return createBatchingPublisher(hostlist, topic, 1, 1);
270         }
271
272         /**
273          * Create a publisher that batches messages. Be sure to close the publisher
274          * to send the last batch and ensure a clean shutdown. Message payloads are
275          * not compressed.
276          * 
277          * @param hostlist
278          *            The host used in the URL to MR. Can be "host:port", can be
279          *            multiple comma-separated entries.
280          * @param topic
281          *            The topic on which to publish messages.
282          * @param maxBatchSize
283          *            The largest set of messages to batch
284          * @param maxAgeMs
285          *            The maximum age of a message waiting in a batch
286          * 
287          * @return a publisher
288          */
289         public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
290                         long maxAgeMs) {
291                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
292         }
293
294         /**
295          * Create a publisher that batches messages. Be sure to close the publisher
296          * to send the last batch and ensure a clean shutdown.
297          * 
298          * @param hostlist
299          *            The host used in the URL to MR. Can be "host:port", can be
300          *            multiple comma-separated entries.
301          * @param topic
302          *            The topic on which to publish messages.
303          * @param maxBatchSize
304          *            The largest set of messages to batch
305          * @param maxAgeMs
306          *            The maximum age of a message waiting in a batch
307          * @param compress
308          *            use gzip compression
309          * 
310          * @return a publisher
311          */
312         public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
313                         long maxAgeMs, boolean compress) {
314                 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
315         }
316
317         /**
318          * Create a publisher that batches messages. Be sure to close the publisher
319          * to send the last batch and ensure a clean shutdown.
320          * 
321          * @param hostSet
322          *            A set of hosts to be used in the URL to MR. Can be
323          *            "host:port". Use multiple entries to enable failover.
324          * @param topic
325          *            The topic on which to publish messages.
326          * @param maxBatchSize
327          *            The largest set of messages to batch
328          * @param maxAgeMs
329          *            The maximum age of a message waiting in a batch
330          * @param compress
331          *            use gzip compression
332          * 
333          * @return a publisher
334          */
335         public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
336                         long maxAgeMs, boolean compress) {
337                 final TreeSet<String> hosts = new TreeSet<String>();
338                 for (String hp : hostSet) {
339                         hosts.add(hp);
340                 }
341                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
342         }
343
344         /**
345          * Create a publisher that batches messages. Be sure to close the publisher
346          * to send the last batch and ensure a clean shutdown.
347          * 
348          * @param hostSet
349          *            A set of hosts to be used in the URL to MR. Can be
350          *            "host:port". Use multiple entries to enable failover.
351          * @param topic
352          *            The topic on which to publish messages.
353          * @param maxBatchSize
354          *            The largest set of messages to batch
355          * @param maxAgeMs
356          *            The maximum age of a message waiting in a batch
357          * @param compress
358          *            use gzip compression
359          * 
360          * @return a publisher
361          */
362         public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
363                         int maxBatchSize, long maxAgeMs, boolean compress) {
364                 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
365                                 .compress(compress).build();
366         }
367
368         /**
369          * Create a publisher that batches messages. Be sure to close the publisher
370          * to send the last batch and ensure a clean shutdown.
371          * 
372          * @param host
373          *            A host to be used in the URL to MR. Can be "host:port". Use
374          *            multiple entries to enable failover.
375          * @param topic
376          *            The topic on which to publish messages.
377          * @param username
378          *            username
379          * @param password
380          *            password
381          * @param maxBatchSize
382          *            The largest set of messages to batch
383          * @param maxAgeMs
384          *            The maximum age of a message waiting in a batch
385          * @param compress
386          *            use gzip compression
387          * @param protocolFlag
388          *            http auth or ueb auth or dme2 method
389          * @param producerFilePath
390          *            all properties for publisher
391          * @return MRBatchingPublisher obj
392          */
393         public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
394                         final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
395                         String producerFilePath) {
396                 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
397                                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
398                                 .compress(compress).build();
399
400                 pub.setHost(host);
401                 pub.setUsername(username);
402                 pub.setPassword(password);
403                 pub.setProtocolFlag(protocolFlag);
404                 return pub;
405         }
406
407         /**
408          * Create a publisher that batches messages. Be sure to close the publisher
409          * to send the last batch and ensure a clean shutdown
410          * 
411          * @param Properties
412          *            props set all properties for publishing message
413          * @return MRBatchingPublisher obj
414          * @throws FileNotFoundException
415          *             exc
416          * @throws IOException
417          *             ioex
418          */
419         public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
420                         throws FileNotFoundException, IOException {
421                 return createInternalBatchingPublisher(props, withResponse);
422         }
423
424         /**
425          * Create a publisher that batches messages. Be sure to close the publisher
426          * to send the last batch and ensure a clean shutdown
427          * 
428          * @param Properties
429          *            props set all properties for publishing message
430          * @return MRBatchingPublisher obj
431          * @throws FileNotFoundException
432          *             exc
433          * @throws IOException
434          *             ioex
435          */
436         public static MRBatchingPublisher createBatchingPublisher(Properties props)
437                         throws FileNotFoundException, IOException {
438                 return createInternalBatchingPublisher(props, false);
439         }
440
441         /**
442          * Create a publisher that batches messages. Be sure to close the publisher
443          * to send the last batch and ensure a clean shutdown
444          * 
445          * @param producerFilePath
446          *            set all properties for publishing message
447          * @return MRBatchingPublisher obj
448          * @throws FileNotFoundException
449          *             exc
450          * @throws IOException
451          *             ioex
452          */
453         public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
454                         throws FileNotFoundException, IOException {
455                 FileReader reader = new FileReader(new File(producerFilePath));
456                 Properties props = new Properties();
457                 props.load(reader);
458                 return createBatchingPublisher(props);
459         }
460
461         /**
462          * Create a publisher that will contain send methods that return response
463          * object to user.
464          * 
465          * @param producerFilePath
466          *            set all properties for publishing message
467          * @return MRBatchingPublisher obj
468          * @throws FileNotFoundException
469          *             exc
470          * @throws IOException
471          *             ioex
472          */
473         public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
474                         throws FileNotFoundException, IOException {
475                 FileReader reader = new FileReader(new File(producerFilePath));
476                 Properties props = new Properties();
477                 props.load(reader);
478                 return createBatchingPublisher(props, withResponse);
479         }
480
481         protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
482                         throws FileNotFoundException, IOException {
483                 assert props != null;
484                 MRSimplerBatchPublisher pub;
485                 if (withResponse) {
486                         pub = new MRSimplerBatchPublisher.Builder()
487                                         .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
488                                         .onTopic(props.getProperty("topic"))
489                                         .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
490                                                         Integer.parseInt(props.getProperty("maxAgeMs").toString()))
491                                         .compress(Boolean.parseBoolean(props.getProperty("compress")))
492                                         .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
493                                         .withResponse(withResponse).build();
494                 } else {
495                         pub = new MRSimplerBatchPublisher.Builder()
496                                         .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
497                                         .onTopic(props.getProperty("topic"))
498                                         .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
499                                                         Integer.parseInt(props.getProperty("maxAgeMs").toString()))
500                                         .compress(Boolean.parseBoolean(props.getProperty("compress")))
501                                         .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
502                 }
503                 pub.setHost(props.getProperty("host"));
504                 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
505
506                         pub.setAuthKey(props.getProperty("authKey"));
507                         pub.setAuthDate(props.getProperty("authDate"));
508                         pub.setUsername(props.getProperty("username"));
509                         pub.setPassword(props.getProperty("password"));
510                 } else {
511                         pub.setUsername(props.getProperty("username"));
512                         pub.setPassword(props.getProperty("password"));
513                 }
514                 pub.setProtocolFlag(props.getProperty("TransportType"));
515                 pub.setProps(props);
516                 prop = new Properties();
517                 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
518                         routeFilePath = props.getProperty("DME2preferredRouterFilePath");
519                         routeReader = new FileReader(new File(routeFilePath));
520                         File fo = new File(routeFilePath);
521                         if (!fo.exists()) {
522                                 routeWriter = new FileWriter(new File(routeFilePath));
523                         }
524                 }
525                 return pub;
526         }
527
528         /**
529          * Create an identity manager client to work with API keys.
530          * 
531          * @param hostSet
532          *            A set of hosts to be used in the URL to MR. Can be
533          *            "host:port". Use multiple entries to enable failover.
534          * @param apiKey
535          *            Your API key
536          * @param apiSecret
537          *            Your API secret
538          * @return an identity manager
539          */
540         public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
541                 MRIdentityManager cim;
542                 try {
543                         cim = new MRMetaClient(hostSet);
544                 } catch (MalformedURLException e) {
545                         throw new IllegalArgumentException(e);
546                 }
547                 cim.setApiCredentials(apiKey, apiSecret);
548                 return cim;
549         }
550
551         /**
552          * Create a topic manager for working with topics.
553          * 
554          * @param hostSet
555          *            A set of hosts to be used in the URL to MR. Can be
556          *            "host:port". Use multiple entries to enable failover.
557          * @param apiKey
558          *            Your API key
559          * @param apiSecret
560          *            Your API secret
561          * @return a topic manager
562          */
563         public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
564                 MRMetaClient tmi;
565                 try {
566                         tmi = new MRMetaClient(hostSet);
567                 } catch (MalformedURLException e) {
568                         throw new IllegalArgumentException(e);
569                 }
570                 tmi.setApiCredentials(apiKey, apiSecret);
571                 return tmi;
572         }
573
574         /**
575          * Inject a consumer. Used to support unit tests.
576          * 
577          * @param cc
578          */
579         public static void $testInject(MRConsumer cc) {
580                 MRClientBuilders.sfConsumerMock = cc;
581         }
582
583         public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
584                         String id, int i, int j, String protocalFlag, String consumerFilePath) {
585
586                 MRConsumerImpl sub;
587                 try {
588                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
589                 } catch (MalformedURLException e) {
590                         throw new IllegalArgumentException(e);
591                 }
592                 sub.setUsername(username);
593                 sub.setPassword(password);
594                 sub.setHost(host);
595                 sub.setProtocolFlag(protocalFlag);
596                 sub.setConsumerFilePath(consumerFilePath);
597                 return sub;
598
599         }
600
601         public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
602                         String id, String protocalFlag, String consumerFilePath, int i, int j) {
603
604                 MRConsumerImpl sub;
605                 try {
606                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
607                 } catch (MalformedURLException e) {
608                         throw new IllegalArgumentException(e);
609                 }
610                 sub.setUsername(username);
611                 sub.setPassword(password);
612                 sub.setHost(host);
613                 sub.setProtocolFlag(protocalFlag);
614                 sub.setConsumerFilePath(consumerFilePath);
615                 return sub;
616
617         }
618
619         public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
620                 FileReader reader = new FileReader(new File(consumerFilePath));
621                 Properties props = new Properties();
622                 props.load(reader);
623
624                 return createConsumer(props);
625         }
626
627         public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
628                 int timeout;
629                 ValidatorUtil.validateSubscriber(props);
630                 if (props.getProperty("timeout") != null)
631                         timeout = Integer.parseInt(props.getProperty("timeout"));
632                 else
633                         timeout = -1;
634                 int limit;
635                 if (props.getProperty("limit") != null)
636                         limit = Integer.parseInt(props.getProperty("limit"));
637                 else
638                         limit = -1;
639                 String group;
640                 if (props.getProperty("group") == null)
641                         group = UUID.randomUUID().toString();
642                 else
643                         group = props.getProperty("group");
644                 MRConsumerImpl sub = null;
645                 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
646                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
647                                         group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
648                                         props.getProperty("authKey"), props.getProperty("authDate"));
649                         sub.setAuthKey(props.getProperty("authKey"));
650                         sub.setAuthDate(props.getProperty("authDate"));
651                         sub.setUsername(props.getProperty("username"));
652                         sub.setPassword(props.getProperty("password"));
653                 } else {
654                         sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
655                                         group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
656                                         props.getProperty("username"), props.getProperty("password"));
657                         sub.setUsername(props.getProperty("username"));
658                         sub.setPassword(props.getProperty("password"));
659                 }
660                 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
661                 sub.setProps(props);
662                 sub.setHost(props.getProperty("host"));
663                 sub.setProtocolFlag(props.getProperty("TransportType"));
664                 sub.setfFilter(props.getProperty("filter"));
665                 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
666                 routeReader = new FileReader(new File(routeFilePath));
667                 prop = new Properties();
668                 File fo = new File(routeFilePath);
669                 if (!fo.exists()) {
670                         routeWriter = new FileWriter(new File(routeFilePath));
671                 }
672                 return sub;
673         }
674 }