First sonar issues review
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  * ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *  
23  *******************************************************************************/
24 package org.onap.dmaap.mr.client;
25
26 import java.io.*;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
29 import java.util.Map;
30 import java.util.Properties;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import javax.ws.rs.core.MultivaluedMap;
34 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
35 import org.onap.dmaap.mr.client.impl.MRMetaClient;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
37 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
38 import org.onap.dmaap.mr.tools.ValidatorUtil;
39
40 /**
41  * A factory for MR clients.<br/>
42  * <br/>
43  * Use caution selecting a consumer creator factory. If the call doesn't accept
44  * a consumer group name, then it creates a consumer that is not restartable.
45  * That is, if you stop your process and start it again, your client will NOT
46  * receive any missed messages on the topic. If you need to ensure receipt of
47  * missed messages, then you must use a consumer that's created with a group
48  * name and ID. (If you create multiple consumer processes using the same group,
49  * load is split across them. Be sure to use a different ID for each
50  * instance.)<br/>
51  * <br/>
52  * Publishers
53  * 
54  * @author author
55  */
56 public class MRClientFactory {
57     private static final String AUTH_KEY = "authKey";
58     private static final String AUTH_DATE = "authDate";
59     private static final String PASSWORD = "password";
60     private static final String USERNAME = "username";
61     private static final String FILTER = "filter";
62     private static final String HOST = "host";
63     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
64     private static final String TOPIC = "topic";
65     private static final String TRANSPORT_TYPE = "TransportType";
66
67     private static MultivaluedMap<String, Object> httpHeadersMap;
68     public static Map<String, String> DME2HeadersMap;
69     public static String routeFilePath;
70
71     public static FileReader routeReader;
72
73     public static FileWriter routeWriter = null;
74     public static Properties prop = null;
75
76     /**
77      * Instantiates MRClientFactory.
78      */
79     private MRClientFactory() {
80         //prevents instantiation.
81     }
82
83     /**
84      * Add getter to avoid direct access to static header map.
85      * @return
86      */
87     public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
88         return httpHeadersMap;
89     }
90
91     /**
92      * Add setter to avoid direct access to static header map.
93      * @param headers
94      */
95     public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
96         httpHeadersMap = headers;
97     }
98
99     /**
100      * Create a consumer instance with the default timeout and no limit on
101      * messages returned. This consumer operates as an independent consumer
102      * (i.e., not in a group) and is NOT re-startable across sessions.
103      * 
104      * @param hostList
105      *            A comma separated list of hosts to use to connect to MR. You
106      *            can include port numbers (3904 is the default). For example,
107      *            "hostname:8080,"
108      * 
109      * @param topic
110      *            The topic to consume
111      * 
112      * @return a consumer
113      */
114     public static MRConsumer createConsumer(String hostList, String topic) {
115         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
116     }
117
118     /**
119      * Create a consumer instance with the default timeout and no limit on
120      * messages returned. This consumer operates as an independent consumer
121      * (i.e., not in a group) and is NOT re-startable across sessions.
122      * 
123      * @param hostSet
124      *            The host used in the URL to MR. Entries can be "host:port".
125      * @param topic
126      *            The topic to consume
127      * 
128      * @return a consumer
129      */
130     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
131         return createConsumer(hostSet, topic, null);
132     }
133
134     /**
135      * Create a consumer instance with server-side filtering, the default
136      * timeout, and no limit on messages returned. This consumer operates as an
137      * independent consumer (i.e., not in a group) and is NOT re-startable
138      * across sessions.
139      * 
140      * @param hostSet
141      *            The host used in the URL to MR. Entries can be "host:port".
142      * @param topic
143      *            The topic to consume
144      * @param filter
145      *            a filter to use on the server side
146      * 
147      * @return a consumer
148      */
149     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
150         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
151     }
152
153     /**
154      * Create a consumer instance with the default timeout, and no limit on
155      * messages returned. This consumer can operate in a logical group and is
156      * re-startable across sessions when you use the same group and ID on
157      * restart.
158      * 
159      * @param hostSet
160      *            The host used in the URL to MR. Entries can be "host:port".
161      * @param topic
162      *            The topic to consume
163      * @param consumerGroup
164      *            The name of the consumer group this consumer is part of
165      * @param consumerId
166      *            The unique id of this consume in its group
167      * 
168      * @return a consumer
169      */
170     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
171             final String consumerId) {
172         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
173     }
174
175     /**
176      * Create a consumer instance with the default timeout, and no limit on
177      * messages returned. This consumer can operate in a logical group and is
178      * re-startable across sessions when you use the same group and ID on
179      * restart.
180      * 
181      * @param hostSet
182      *            The host used in the URL to MR. Entries can be "host:port".
183      * @param topic
184      *            The topic to consume
185      * @param consumerGroup
186      *            The name of the consumer group this consumer is part of
187      * @param consumerId
188      *            The unique id of this consume in its group
189      * @param timeoutMs
190      *            The amount of time in milliseconds that the server should keep
191      *            the connection open while waiting for message traffic. Use -1
192      *            for default timeout.
193      * @param limit
194      *            A limit on the number of messages returned in a single call.
195      *            Use -1 for no limit.
196      * 
197      * @return a consumer
198      */
199     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
200             final String consumerId, int timeoutMs, int limit) {
201         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
202     }
203
204     /**
205      * Create a consumer instance with the default timeout, and no limit on
206      * messages returned. This consumer can operate in a logical group and is
207      * re-startable across sessions when you use the same group and ID on
208      * restart. This consumer also uses server-side filtering.
209      * 
210      * @param hostList
211      *            A comma separated list of hosts to use to connect to MR. You
212      *            can include port numbers (3904 is the default)"
213      * @param topic
214      *            The topic to consume
215      * @param consumerGroup
216      *            The name of the consumer group this consumer is part of
217      * @param consumerId
218      *            The unique id of this consume in its group
219      * @param timeoutMs
220      *            The amount of time in milliseconds that the server should keep
221      *            the connection open while waiting for message traffic. Use -1
222      *            for default timeout.
223      * @param limit
224      *            A limit on the number of messages returned in a single call.
225      *            Use -1 for no limit.
226      * @param filter
227      *            A Highland Park filter expression using only built-in filter
228      *            components. Use null for "no filter".
229      * 
230      * @return a consumer
231      */
232     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
233             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
234         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
235                 filter, apiKey, apiSecret);
236     }
237
238     /**
239      * Create a consumer instance with the default timeout, and no limit on
240      * messages returned. This consumer can operate in a logical group and is
241      * re-startable across sessions when you use the same group and ID on
242      * restart. This consumer also uses server-side filtering.
243      * 
244      * @param hostSet
245      *            The host used in the URL to MR. Entries can be "host:port".
246      * @param topic
247      *            The topic to consume
248      * @param consumerGroup
249      *            The name of the consumer group this consumer is part of
250      * @param consumerId
251      *            The unique id of this consume in its group
252      * @param timeoutMs
253      *            The amount of time in milliseconds that the server should keep
254      *            the connection open while waiting for message traffic. Use -1
255      *            for default timeout.
256      * @param limit
257      *            A limit on the number of messages returned in a single call.
258      *            Use -1 for no limit.
259      * @param filter
260      *            A Highland Park filter expression using only built-in filter
261      *            components. Use null for "no filter".
262      * 
263      * @return a consumer
264      */
265     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
266             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
267         if (MRClientBuilders.sfConsumerMock != null)
268             return MRClientBuilders.sfConsumerMock;
269         try {
270             return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
271                     .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
272                     .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
273                     .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
274                     .createMRConsumerImpl();
275         } catch (MalformedURLException e) {
276             throw new IllegalArgumentException(e);
277         }
278     }
279
280     /*************************************************************************/
281     /*************************************************************************/
282     /*************************************************************************/
283
284     /**
285      * Create a publisher that sends each message (or group of messages)
286      * immediately. Most applications should favor higher latency for much
287      * higher message throughput and the "simple publisher" is not a good
288      * choice.
289      * 
290      * @param hostlist
291      *            The host used in the URL to MR. Can be "host:port", can be
292      *            multiple comma-separated entries.
293      * @param topic
294      *            The topic on which to publish messages.
295      * @return a publisher
296      */
297     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
298         return createBatchingPublisher(hostlist, topic, 1, 1);
299     }
300
301     /**
302      * Create a publisher that batches messages. Be sure to close the publisher
303      * to send the last batch and ensure a clean shutdown. Message payloads are
304      * not compressed.
305      * 
306      * @param hostlist
307      *            The host used in the URL to MR. Can be "host:port", can be
308      *            multiple comma-separated entries.
309      * @param topic
310      *            The topic on which to publish messages.
311      * @param maxBatchSize
312      *            The largest set of messages to batch
313      * @param maxAgeMs
314      *            The maximum age of a message waiting in a batch
315      * 
316      * @return a publisher
317      */
318     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
319             long maxAgeMs) {
320         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
321     }
322
323     /**
324      * Create a publisher that batches messages. Be sure to close the publisher
325      * to send the last batch and ensure a clean shutdown.
326      * 
327      * @param hostlist
328      *            The host used in the URL to MR. Can be "host:port", can be
329      *            multiple comma-separated entries.
330      * @param topic
331      *            The topic on which to publish messages.
332      * @param maxBatchSize
333      *            The largest set of messages to batch
334      * @param maxAgeMs
335      *            The maximum age of a message waiting in a batch
336      * @param compress
337      *            use gzip compression
338      * 
339      * @return a publisher
340      */
341     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
342             long maxAgeMs, boolean compress) {
343         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
344     }
345
346     /**
347      * Create a publisher that batches messages. Be sure to close the publisher
348      * to send the last batch and ensure a clean shutdown.
349      * 
350      * @param hostSet
351      *            A set of hosts to be used in the URL to MR. Can be
352      *            "host:port". Use multiple entries to enable failover.
353      * @param topic
354      *            The topic on which to publish messages.
355      * @param maxBatchSize
356      *            The largest set of messages to batch
357      * @param maxAgeMs
358      *            The maximum age of a message waiting in a batch
359      * @param compress
360      *            use gzip compression
361      * 
362      * @return a publisher
363      */
364     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
365             long maxAgeMs, boolean compress) {
366         final TreeSet<String> hosts = new TreeSet<>();
367         for (String hp : hostSet) {
368             hosts.add(hp);
369         }
370         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
371     }
372
373     /**
374      * Create a publisher that batches messages. Be sure to close the publisher
375      * to send the last batch and ensure a clean shutdown.
376      * 
377      * @param hostSet
378      *            A set of hosts to be used in the URL to MR. Can be
379      *            "host:port". Use multiple entries to enable failover.
380      * @param topic
381      *            The topic on which to publish messages.
382      * @param maxBatchSize
383      *            The largest set of messages to batch
384      * @param maxAgeMs
385      *            The maximum age of a message waiting in a batch
386      * @param compress
387      *            use gzip compression
388      * 
389      * @return a publisher
390      */
391     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
392             int maxBatchSize, long maxAgeMs, boolean compress) {
393         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
394                 .compress(compress).build();
395     }
396
397     /**
398      * Create a publisher that batches messages. Be sure to close the publisher
399      * to send the last batch and ensure a clean shutdown.
400      * 
401      * @param host
402      *            A host to be used in the URL to MR. Can be "host:port". Use
403      *            multiple entries to enable failover.
404      * @param topic
405      *            The topic on which to publish messages.
406      * @param username
407      *            username
408      * @param password
409      *            password
410      * @param maxBatchSize
411      *            The largest set of messages to batch
412      * @param maxAgeMs
413      *            The maximum age of a message waiting in a batch
414      * @param compress
415      *            use gzip compression
416      * @param protocolFlag
417      *            http auth or ueb auth or dme2 method
418      * @return MRBatchingPublisher obj
419      */
420     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
421             final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
422         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
423                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
424                 .compress(compress).build();
425
426         pub.setHost(host);
427         pub.setUsername(username);
428         pub.setPassword(password);
429         pub.setProtocolFlag(protocolFlag);
430         return pub;
431     }
432
433     /**
434      * Create a publisher that batches messages. Be sure to close the publisher
435      * to send the last batch and ensure a clean shutdown
436      * 
437      * @param props
438      *            props set all properties for publishing message
439      * @return MRBatchingPublisher obj
440      * @throws FileNotFoundException
441      *             exc
442      * @throws IOException
443      *             ioex
444      */
445     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
446             throws FileNotFoundException, IOException {
447         return createInternalBatchingPublisher(props, withResponse);
448     }
449
450     /**
451      * Create a publisher that batches messages. Be sure to close the publisher
452      * to send the last batch and ensure a clean shutdown
453      * 
454      * @param props
455      *            props set all properties for publishing message
456      * @return MRBatchingPublisher obj
457      * @throws FileNotFoundException
458      *             exc
459      * @throws IOException
460      *             ioex
461      */
462     public static MRBatchingPublisher createBatchingPublisher(Properties props)
463             throws FileNotFoundException, IOException {
464         return createInternalBatchingPublisher(props, false);
465     }
466
467     /**
468      * Create a publisher that batches messages. Be sure to close the publisher
469      * to send the last batch and ensure a clean shutdown
470      * 
471      * @param producerFilePath
472      *            set all properties for publishing message
473      * @return MRBatchingPublisher obj
474      * @throws FileNotFoundException
475      *             exc
476      * @throws IOException
477      *             ioex
478      */
479     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
480             throws FileNotFoundException, IOException {
481         Properties props = new Properties();
482         try(InputStream input = new FileInputStream(producerFilePath)) {
483             props.load(input);
484         }
485         return createBatchingPublisher(props);
486     }
487
488     /**
489      * Create a publisher that will contain send methods that return response
490      * object to user.
491      * 
492      * @param producerFilePath
493      *            set all properties for publishing message
494      * @return MRBatchingPublisher obj
495      * @throws FileNotFoundException
496      *             exc
497      * @throws IOException
498      *             ioex
499      */
500     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
501             throws FileNotFoundException, IOException {
502         Properties props = new Properties();
503         try(InputStream input = new FileInputStream(producerFilePath)) {
504             props.load(input);
505         }
506         return createBatchingPublisher(props, withResponse);
507     }
508
509     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
510             throws FileNotFoundException, IOException {
511         assert props != null;
512         MRSimplerBatchPublisher pub;
513         if (withResponse) {
514             pub = new MRSimplerBatchPublisher.Builder()
515                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
516                     .onTopic(props.getProperty(TOPIC))
517                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
518                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
519                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
520                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
521                     .withResponse(withResponse).build();
522         } else {
523             pub = new MRSimplerBatchPublisher.Builder()
524                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
525                     .onTopic(props.getProperty(TOPIC))
526                     .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
527                             Integer.parseInt(props.getProperty("maxAgeMs").toString()))
528                     .compress(Boolean.parseBoolean(props.getProperty("compress")))
529                     .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
530         }
531         pub.setHost(props.getProperty(HOST));
532         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
533
534             pub.setAuthKey(props.getProperty(AUTH_KEY));
535             pub.setAuthDate(props.getProperty(AUTH_DATE));
536             pub.setUsername(props.getProperty(USERNAME));
537             pub.setPassword(props.getProperty(PASSWORD));
538         } else {
539             pub.setUsername(props.getProperty(USERNAME));
540             pub.setPassword(props.getProperty(PASSWORD));
541         }
542         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
543         pub.setProps(props);
544         prop = new Properties();
545         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
546             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
547             routeReader = new FileReader(new File(routeFilePath));
548             File fo = new File(routeFilePath);
549             if (!fo.exists()) {
550                 routeWriter = new FileWriter(new File(routeFilePath));
551             }
552         }
553         return pub;
554     }
555
556     /**
557      * Create an identity manager client to work with API keys.
558      * 
559      * @param hostSet
560      *            A set of hosts to be used in the URL to MR. Can be
561      *            "host:port". Use multiple entries to enable failover.
562      * @param apiKey
563      *            Your API key
564      * @param apiSecret
565      *            Your API secret
566      * @return an identity manager
567      */
568     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
569         MRIdentityManager cim;
570         try {
571             cim = new MRMetaClient(hostSet);
572         } catch (MalformedURLException e) {
573             throw new IllegalArgumentException(e);
574         }
575         cim.setApiCredentials(apiKey, apiSecret);
576         return cim;
577     }
578
579     /**
580      * Create a topic manager for working with topics.
581      * 
582      * @param hostSet
583      *            A set of hosts to be used in the URL to MR. Can be
584      *            "host:port". Use multiple entries to enable failover.
585      * @param apiKey
586      *            Your API key
587      * @param apiSecret
588      *            Your API secret
589      * @return a topic manager
590      */
591     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
592         MRMetaClient tmi;
593         try {
594             tmi = new MRMetaClient(hostSet);
595         } catch (MalformedURLException e) {
596             throw new IllegalArgumentException(e);
597         }
598         tmi.setApiCredentials(apiKey, apiSecret);
599         return tmi;
600     }
601
602     /**
603      * Inject a consumer. Used to support unit tests.
604      * 
605      * @param cc
606      */
607     public static void $testInject(MRConsumer cc) {
608         MRClientBuilders.sfConsumerMock = cc;
609     }
610
611     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
612             String id, int i, int j, String protocalFlag, String consumerFilePath) {
613
614         MRConsumerImpl sub;
615         try {
616             sub = new MRConsumerImpl.MRConsumerImplBuilder()
617                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
618                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
619                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
620                     .createMRConsumerImpl();
621         } catch (MalformedURLException e) {
622             throw new IllegalArgumentException(e);
623         }
624         sub.setUsername(username);
625         sub.setPassword(password);
626         sub.setHost(host);
627         sub.setProtocolFlag(protocalFlag);
628         sub.setConsumerFilePath(consumerFilePath);
629         return sub;
630
631     }
632
633     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
634             String id, String protocalFlag, String consumerFilePath, int i, int j) {
635
636         MRConsumerImpl sub;
637         try {
638             sub = new MRConsumerImpl.MRConsumerImplBuilder()
639                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
640                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
641                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
642                     .createMRConsumerImpl();
643         } catch (MalformedURLException e) {
644             throw new IllegalArgumentException(e);
645         }
646         sub.setUsername(username);
647         sub.setPassword(password);
648         sub.setHost(host);
649         sub.setProtocolFlag(protocalFlag);
650         sub.setConsumerFilePath(consumerFilePath);
651         return sub;
652
653     }
654
655     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
656         Properties props = new Properties();
657         try(InputStream input = new FileInputStream(consumerFilePath)) {
658             props.load(input);
659         }
660         return createConsumer(props);
661     }
662
663     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
664         int timeout;
665         ValidatorUtil.validateSubscriber(props);
666         if (props.getProperty("timeout") != null)
667             timeout = Integer.parseInt(props.getProperty("timeout"));
668         else
669             timeout = -1;
670         int limit;
671         if (props.getProperty("limit") != null)
672             limit = Integer.parseInt(props.getProperty("limit"));
673         else
674             limit = -1;
675         String group;
676         if (props.getProperty("group") == null)
677             group = UUID.randomUUID().toString();
678         else
679             group = props.getProperty("group");
680         MRConsumerImpl sub = null;
681         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
682             sub = new MRConsumerImpl.MRConsumerImplBuilder()
683                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
684                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
685                     .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
686                     .setFilter(props.getProperty(FILTER))
687                     .setApiKey_username(props.getProperty(AUTH_KEY))
688                     .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
689             sub.setAuthKey(props.getProperty(AUTH_KEY));
690             sub.setAuthDate(props.getProperty(AUTH_DATE));
691             sub.setUsername(props.getProperty(USERNAME));
692             sub.setPassword(props.getProperty(PASSWORD));
693         } else {
694             sub = new MRConsumerImpl.MRConsumerImplBuilder()
695                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
696                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
697                     .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
698                     .setFilter(props.getProperty(FILTER))
699                     .setApiKey_username(props.getProperty(USERNAME))
700                     .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
701             sub.setUsername(props.getProperty(USERNAME));
702             sub.setPassword(props.getProperty(PASSWORD));
703         }
704         
705         sub.setProps(props);
706         sub.setHost(props.getProperty(HOST));
707         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
708         sub.setfFilter(props.getProperty(FILTER));
709         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
710             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
711             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
712             routeReader = new FileReader(new File(routeFilePath));
713             prop = new Properties();
714             File fo = new File(routeFilePath);
715                 if (!fo.exists()) {
716                     routeWriter = new FileWriter(new File(routeFilePath));
717                 }
718         }
719         
720         return sub;
721     }
722 }