Fix too many constructor param in dmaapclient
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  * ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *  
23  *******************************************************************************/
24 package org.onap.dmaap.mr.client;
25
26 import java.io.File;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
33 import java.util.Map;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import javax.ws.rs.core.MultivaluedMap;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRMetaClient;
40 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.dmaap.mr.tools.ValidatorUtil;
43
44 /**
45  * A factory for MR clients.<br/>
46  * <br/>
47  * Use caution selecting a consumer creator factory. If the call doesn't accept
48  * a consumer group name, then it creates a consumer that is not restartable.
49  * That is, if you stop your process and start it again, your client will NOT
50  * receive any missed messages on the topic. If you need to ensure receipt of
51  * missed messages, then you must use a consumer that's created with a group
52  * name and ID. (If you create multiple consumer processes using the same group,
53  * load is split across them. Be sure to use a different ID for each
54  * instance.)<br/>
55  * <br/>
56  * Publishers
57  * 
58  * @author author
59  */
60 public class MRClientFactory {
61     private static final String AUTH_KEY = "authKey";
62     private static final String AUTH_DATE = "authDate";
63     private static final String PASSWORD = "password";
64     private static final String USERNAME = "username";
65     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
66     private static final String TOPIC = "topic";
67     private static final String TRANSPORT_TYPE = "TransportType";
68     public static MultivaluedMap<String, Object> HTTPHeadersMap;
69     public static Map<String, String> DME2HeadersMap;
70     public static String routeFilePath;
71
72     public static FileReader routeReader;
73
74     public static FileWriter routeWriter = null;
75     public static Properties prop = null;
76
77     /**
78      * Instantiates MRClientFactory.
79      */
80     private MRClientFactory() {
81         //prevents instantiation.
82     }
83     
84     /**
85      * Create a consumer instance with the default timeout and no limit on
86      * messages returned. This consumer operates as an independent consumer
87      * (i.e., not in a group) and is NOT re-startable across sessions.
88      * 
89      * @param hostList
90      *            A comma separated list of hosts to use to connect to MR. You
91      *            can include port numbers (3904 is the default). For example,
92      *            "hostname:8080,"
93      * 
94      * @param topic
95      *            The topic to consume
96      * 
97      * @return a consumer
98      */
99     public static MRConsumer createConsumer(String hostList, String topic) {
100         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
101     }
102
103     /**
104      * Create a consumer instance with the default timeout and no limit on
105      * messages returned. This consumer operates as an independent consumer
106      * (i.e., not in a group) and is NOT re-startable across sessions.
107      * 
108      * @param hostSet
109      *            The host used in the URL to MR. Entries can be "host:port".
110      * @param topic
111      *            The topic to consume
112      * 
113      * @return a consumer
114      */
115     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
116         return createConsumer(hostSet, topic, null);
117     }
118
119     /**
120      * Create a consumer instance with server-side filtering, the default
121      * timeout, and no limit on messages returned. This consumer operates as an
122      * independent consumer (i.e., not in a group) and is NOT re-startable
123      * across sessions.
124      * 
125      * @param hostSet
126      *            The host used in the URL to MR. Entries can be "host:port".
127      * @param topic
128      *            The topic to consume
129      * @param filter
130      *            a filter to use on the server side
131      * 
132      * @return a consumer
133      */
134     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
135         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
136     }
137
138     /**
139      * Create a consumer instance with the default timeout, and no limit on
140      * messages returned. This consumer can operate in a logical group and is
141      * re-startable across sessions when you use the same group and ID on
142      * restart.
143      * 
144      * @param hostSet
145      *            The host used in the URL to MR. Entries can be "host:port".
146      * @param topic
147      *            The topic to consume
148      * @param consumerGroup
149      *            The name of the consumer group this consumer is part of
150      * @param consumerId
151      *            The unique id of this consume in its group
152      * 
153      * @return a consumer
154      */
155     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
156             final String consumerId) {
157         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
158     }
159
160     /**
161      * Create a consumer instance with the default timeout, and no limit on
162      * messages returned. This consumer can operate in a logical group and is
163      * re-startable across sessions when you use the same group and ID on
164      * restart.
165      * 
166      * @param hostSet
167      *            The host used in the URL to MR. Entries can be "host:port".
168      * @param topic
169      *            The topic to consume
170      * @param consumerGroup
171      *            The name of the consumer group this consumer is part of
172      * @param consumerId
173      *            The unique id of this consume in its group
174      * @param timeoutMs
175      *            The amount of time in milliseconds that the server should keep
176      *            the connection open while waiting for message traffic. Use -1
177      *            for default timeout.
178      * @param limit
179      *            A limit on the number of messages returned in a single call.
180      *            Use -1 for no limit.
181      * 
182      * @return a consumer
183      */
184     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
185             final String consumerId, int timeoutMs, int limit) {
186         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
187     }
188
189     /**
190      * Create a consumer instance with the default timeout, and no limit on
191      * messages returned. This consumer can operate in a logical group and is
192      * re-startable across sessions when you use the same group and ID on
193      * restart. This consumer also uses server-side filtering.
194      * 
195      * @param hostList
196      *            A comma separated list of hosts to use to connect to MR. You
197      *            can include port numbers (3904 is the default)"
198      * @param topic
199      *            The topic to consume
200      * @param consumerGroup
201      *            The name of the consumer group this consumer is part of
202      * @param consumerId
203      *            The unique id of this consume in its group
204      * @param timeoutMs
205      *            The amount of time in milliseconds that the server should keep
206      *            the connection open while waiting for message traffic. Use -1
207      *            for default timeout.
208      * @param limit
209      *            A limit on the number of messages returned in a single call.
210      *            Use -1 for no limit.
211      * @param filter
212      *            A Highland Park filter expression using only built-in filter
213      *            components. Use null for "no filter".
214      * 
215      * @return a consumer
216      */
217     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
218             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
219         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
220                 filter, apiKey, apiSecret);
221     }
222
223     /**
224      * Create a consumer instance with the default timeout, and no limit on
225      * messages returned. This consumer can operate in a logical group and is
226      * re-startable across sessions when you use the same group and ID on
227      * restart. This consumer also uses server-side filtering.
228      * 
229      * @param hostSet
230      *            The host used in the URL to MR. Entries can be "host:port".
231      * @param topic
232      *            The topic to consume
233      * @param consumerGroup
234      *            The name of the consumer group this consumer is part of
235      * @param consumerId
236      *            The unique id of this consume in its group
237      * @param timeoutMs
238      *            The amount of time in milliseconds that the server should keep
239      *            the connection open while waiting for message traffic. Use -1
240      *            for default timeout.
241      * @param limit
242      *            A limit on the number of messages returned in a single call.
243      *            Use -1 for no limit.
244      * @param filter
245      *            A Highland Park filter expression using only built-in filter
246      *            components. Use null for "no filter".
247      * 
248      * @return a consumer
249      */
250     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
251             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
252         if (MRClientBuilders.sfConsumerMock != null)
253             return MRClientBuilders.sfConsumerMock;
254         try {
255             return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
256                     .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
257                     .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
258                     .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
259                     .createMRConsumerImpl();
260         } catch (MalformedURLException e) {
261             throw new IllegalArgumentException(e);
262         }
263     }
264
265     /*************************************************************************/
266     /*************************************************************************/
267     /*************************************************************************/
268
269     /**
270      * Create a publisher that sends each message (or group of messages)
271      * immediately. Most applications should favor higher latency for much
272      * higher message throughput and the "simple publisher" is not a good
273      * choice.
274      * 
275      * @param hostlist
276      *            The host used in the URL to MR. Can be "host:port", can be
277      *            multiple comma-separated entries.
278      * @param topic
279      *            The topic on which to publish messages.
280      * @return a publisher
281      */
282     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
283         return createBatchingPublisher(hostlist, topic, 1, 1);
284     }
285
286     /**
287      * Create a publisher that batches messages. Be sure to close the publisher
288      * to send the last batch and ensure a clean shutdown. Message payloads are
289      * not compressed.
290      * 
291      * @param hostlist
292      *            The host used in the URL to MR. Can be "host:port", can be
293      *            multiple comma-separated entries.
294      * @param topic
295      *            The topic on which to publish messages.
296      * @param maxBatchSize
297      *            The largest set of messages to batch
298      * @param maxAgeMs
299      *            The maximum age of a message waiting in a batch
300      * 
301      * @return a publisher
302      */
303     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
304             long maxAgeMs) {
305         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
306     }
307
308     /**
309      * Create a publisher that batches messages. Be sure to close the publisher
310      * to send the last batch and ensure a clean shutdown.
311      * 
312      * @param hostlist
313      *            The host used in the URL to MR. Can be "host:port", can be
314      *            multiple comma-separated entries.
315      * @param topic
316      *            The topic on which to publish messages.
317      * @param maxBatchSize
318      *            The largest set of messages to batch
319      * @param maxAgeMs
320      *            The maximum age of a message waiting in a batch
321      * @param compress
322      *            use gzip compression
323      * 
324      * @return a publisher
325      */
326     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
327             long maxAgeMs, boolean compress) {
328         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
329     }
330
331     /**
332      * Create a publisher that batches messages. Be sure to close the publisher
333      * to send the last batch and ensure a clean shutdown.
334      * 
335      * @param hostSet
336      *            A set of hosts to be used in the URL to MR. Can be
337      *            "host:port". Use multiple entries to enable failover.
338      * @param topic
339      *            The topic on which to publish messages.
340      * @param maxBatchSize
341      *            The largest set of messages to batch
342      * @param maxAgeMs
343      *            The maximum age of a message waiting in a batch
344      * @param compress
345      *            use gzip compression
346      * 
347      * @return a publisher
348      */
349     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
350             long maxAgeMs, boolean compress) {
351         final TreeSet<String> hosts = new TreeSet<>();
352         for (String hp : hostSet) {
353             hosts.add(hp);
354         }
355         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
356     }
357
358     /**
359      * Create a publisher that batches messages. Be sure to close the publisher
360      * to send the last batch and ensure a clean shutdown.
361      * 
362      * @param hostSet
363      *            A set of hosts to be used in the URL to MR. Can be
364      *            "host:port". Use multiple entries to enable failover.
365      * @param topic
366      *            The topic on which to publish messages.
367      * @param maxBatchSize
368      *            The largest set of messages to batch
369      * @param maxAgeMs
370      *            The maximum age of a message waiting in a batch
371      * @param compress
372      *            use gzip compression
373      * 
374      * @return a publisher
375      */
376     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
377             int maxBatchSize, long maxAgeMs, boolean compress) {
378         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
379                 .compress(compress).build();
380     }
381
382     /**
383      * Create a publisher that batches messages. Be sure to close the publisher
384      * to send the last batch and ensure a clean shutdown.
385      * 
386      * @param host
387      *            A host to be used in the URL to MR. Can be "host:port". Use
388      *            multiple entries to enable failover.
389      * @param topic
390      *            The topic on which to publish messages.
391      * @param username
392      *            username
393      * @param password
394      *            password
395      * @param maxBatchSize
396      *            The largest set of messages to batch
397      * @param maxAgeMs
398      *            The maximum age of a message waiting in a batch
399      * @param compress
400      *            use gzip compression
401      * @param protocolFlag
402      *            http auth or ueb auth or dme2 method
403      * @param producerFilePath
404      *            all properties for publisher
405      * @return MRBatchingPublisher obj
406      */
407     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
408             final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
409         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
410                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
411                 .compress(compress).build();
412
413         pub.setHost(host);
414         pub.setUsername(username);
415         pub.setPassword(password);
416         pub.setProtocolFlag(protocolFlag);
417         return pub;
418     }
419
420     /**
421      * Create a publisher that batches messages. Be sure to close the publisher
422      * to send the last batch and ensure a clean shutdown
423      * 
424      * @param Properties
425      *            props set all properties for publishing message
426      * @return MRBatchingPublisher obj
427      * @throws FileNotFoundException
428      *             exc
429      * @throws IOException
430      *             ioex
431      */
432     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
433             throws FileNotFoundException, IOException {
434         return createInternalBatchingPublisher(props, withResponse);
435     }
436
437     /**
438      * Create a publisher that batches messages. Be sure to close the publisher
439      * to send the last batch and ensure a clean shutdown
440      * 
441      * @param Properties
442      *            props set all properties for publishing message
443      * @return MRBatchingPublisher obj
444      * @throws FileNotFoundException
445      *             exc
446      * @throws IOException
447      *             ioex
448      */
449     public static MRBatchingPublisher createBatchingPublisher(Properties props)
450             throws FileNotFoundException, IOException {
451         return createInternalBatchingPublisher(props, false);
452     }
453
454     /**
455      * Create a publisher that batches messages. Be sure to close the publisher
456      * to send the last batch and ensure a clean shutdown
457      * 
458      * @param producerFilePath
459      *            set all properties for publishing message
460      * @return MRBatchingPublisher obj
461      * @throws FileNotFoundException
462      *             exc
463      * @throws IOException
464      *             ioex
465      */
466     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
467             throws FileNotFoundException, IOException {
468         FileReader reader = new FileReader(new File(producerFilePath));
469         Properties props = new Properties();
470         props.load(reader);
471         return createBatchingPublisher(props);
472     }
473
474     /**
475      * Create a publisher that will contain send methods that return response
476      * object to user.
477      * 
478      * @param producerFilePath
479      *            set all properties for publishing message
480      * @return MRBatchingPublisher obj
481      * @throws FileNotFoundException
482      *             exc
483      * @throws IOException
484      *             ioex
485      */
486     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
487             throws FileNotFoundException, IOException {
488         FileReader reader = new FileReader(new File(producerFilePath));
489         Properties props = new Properties();
490         props.load(reader);
491         return createBatchingPublisher(props, withResponse);
492     }
493
494     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
495             throws FileNotFoundException, IOException {
496         assert props != null;
497         MRSimplerBatchPublisher pub;
498         if (withResponse) {
499             pub = new MRSimplerBatchPublisher.Builder()
500                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
501                     .onTopic(props.getProperty(TOPIC))
502                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
503                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
504                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
505                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
506                     .withResponse(withResponse).build();
507         } else {
508             pub = new MRSimplerBatchPublisher.Builder()
509                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
510                     .onTopic(props.getProperty(TOPIC))
511                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
512                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
513                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
514                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
515         }
516         pub.setHost(props.getProperty("host"));
517         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
518
519             pub.setAuthKey(props.getProperty(AUTH_KEY));
520             pub.setAuthDate(props.getProperty(AUTH_DATE));
521             pub.setUsername(props.getProperty(USERNAME));
522             pub.setPassword(props.getProperty(PASSWORD));
523         } else {
524             pub.setUsername(props.getProperty(USERNAME));
525             pub.setPassword(props.getProperty(PASSWORD));
526         }
527         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
528         pub.setProps(props);
529         prop = new Properties();
530         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
531             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
532             routeReader = new FileReader(new File(routeFilePath));
533             File fo = new File(routeFilePath);
534             if (!fo.exists()) {
535                 routeWriter = new FileWriter(new File(routeFilePath));
536             }
537         }
538         return pub;
539     }
540
541     /**
542      * Create an identity manager client to work with API keys.
543      * 
544      * @param hostSet
545      *            A set of hosts to be used in the URL to MR. Can be
546      *            "host:port". Use multiple entries to enable failover.
547      * @param apiKey
548      *            Your API key
549      * @param apiSecret
550      *            Your API secret
551      * @return an identity manager
552      */
553     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
554         MRIdentityManager cim;
555         try {
556             cim = new MRMetaClient(hostSet);
557         } catch (MalformedURLException e) {
558             throw new IllegalArgumentException(e);
559         }
560         cim.setApiCredentials(apiKey, apiSecret);
561         return cim;
562     }
563
564     /**
565      * Create a topic manager for working with topics.
566      * 
567      * @param hostSet
568      *            A set of hosts to be used in the URL to MR. Can be
569      *            "host:port". Use multiple entries to enable failover.
570      * @param apiKey
571      *            Your API key
572      * @param apiSecret
573      *            Your API secret
574      * @return a topic manager
575      */
576     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
577         MRMetaClient tmi;
578         try {
579             tmi = new MRMetaClient(hostSet);
580         } catch (MalformedURLException e) {
581             throw new IllegalArgumentException(e);
582         }
583         tmi.setApiCredentials(apiKey, apiSecret);
584         return tmi;
585     }
586
587     /**
588      * Inject a consumer. Used to support unit tests.
589      * 
590      * @param cc
591      */
592     public static void $testInject(MRConsumer cc) {
593         MRClientBuilders.sfConsumerMock = cc;
594     }
595
596     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
597             String id, int i, int j, String protocalFlag, String consumerFilePath) {
598
599         MRConsumerImpl sub;
600         try {
601             sub = new MRConsumerImpl.MRConsumerImplBuilder()
602                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
603                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
604                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
605                     .createMRConsumerImpl();
606         } catch (MalformedURLException e) {
607             throw new IllegalArgumentException(e);
608         }
609         sub.setUsername(username);
610         sub.setPassword(password);
611         sub.setHost(host);
612         sub.setProtocolFlag(protocalFlag);
613         sub.setConsumerFilePath(consumerFilePath);
614         return sub;
615
616     }
617
618     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
619             String id, String protocalFlag, String consumerFilePath, int i, int j) {
620
621         MRConsumerImpl sub;
622         try {
623             sub = new MRConsumerImpl.MRConsumerImplBuilder()
624                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
625                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
626                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
627                     .createMRConsumerImpl();
628         } catch (MalformedURLException e) {
629             throw new IllegalArgumentException(e);
630         }
631         sub.setUsername(username);
632         sub.setPassword(password);
633         sub.setHost(host);
634         sub.setProtocolFlag(protocalFlag);
635         sub.setConsumerFilePath(consumerFilePath);
636         return sub;
637
638     }
639
640     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
641         FileReader reader = new FileReader(new File(consumerFilePath));
642         Properties props = new Properties();
643         props.load(reader);
644
645         return createConsumer(props);
646     }
647
648     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
649         int timeout;
650         ValidatorUtil.validateSubscriber(props);
651         if (props.getProperty("timeout") != null)
652             timeout = Integer.parseInt(props.getProperty("timeout"));
653         else
654             timeout = -1;
655         int limit;
656         if (props.getProperty("limit") != null)
657             limit = Integer.parseInt(props.getProperty("limit"));
658         else
659             limit = -1;
660         String group;
661         if (props.getProperty("group") == null)
662             group = UUID.randomUUID().toString();
663         else
664             group = props.getProperty("group");
665         MRConsumerImpl sub = null;
666         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
667             sub = new MRConsumerImpl.MRConsumerImplBuilder()
668                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
669                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
670                     .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
671                     .setFilter(props.getProperty("filter"))
672                     .setApiKey_username(props.getProperty(AUTH_KEY))
673                     .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
674             sub.setAuthKey(props.getProperty(AUTH_KEY));
675             sub.setAuthDate(props.getProperty(AUTH_DATE));
676             sub.setUsername(props.getProperty(USERNAME));
677             sub.setPassword(props.getProperty(PASSWORD));
678         } else {
679             sub = new MRConsumerImpl.MRConsumerImplBuilder()
680                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
681                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
682                     .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
683                     .setFilter(props.getProperty("filter"))
684                     .setApiKey_username(props.getProperty(USERNAME))
685                     .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
686             sub.setUsername(props.getProperty(USERNAME));
687             sub.setPassword(props.getProperty(PASSWORD));
688         }
689         
690         sub.setProps(props);
691         sub.setHost(props.getProperty("host"));
692         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
693         sub.setfFilter(props.getProperty("filter"));
694         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
695             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
696             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
697             routeReader = new FileReader(new File(routeFilePath));
698             prop = new Properties();
699             File fo = new File(routeFilePath);
700                 if (!fo.exists()) {
701                     routeWriter = new FileWriter(new File(routeFilePath));
702                 }
703         }
704         
705         return sub;
706     }
707 }