[DMAAP-CLIENT] Release 1.1.14
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  *   Modifications Copyright © 2021 Orange.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *
24  *******************************************************************************/
25
26 package org.onap.dmaap.mr.client;
27
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.FileNotFoundException;
31 import java.io.FileReader;
32 import java.io.FileWriter;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.MalformedURLException;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.Map;
39 import java.util.Properties;
40 import java.util.TreeSet;
41 import java.util.UUID;
42 import javax.ws.rs.core.MultivaluedMap;
43
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.impl.MRMetaClient;
46 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
47 import org.onap.dmaap.mr.tools.ValidatorUtil;
48
49 /**
50  * A factory for MR clients.<br/>
51  * <br/>
52  * Use caution selecting a consumer creator factory. If the call doesn't accept
53  * a consumer group name, then it creates a consumer that is not restartable.
54  * That is, if you stop your process and start it again, your client will NOT
55  * receive any missed messages on the topic. If you need to ensure receipt of
56  * missed messages, then you must use a consumer that's created with a group
57  * name and ID. (If you create multiple consumer processes using the same group,
58  * load is split across them. Be sure to use a different ID for each
59  * instance.)<br/>
60  * <br/>
61  * Publishers
62  *
63  * @author author
64  */
65 public class MRClientFactory {
66
67     private static MultivaluedMap<String, Object> httpHeadersMap;
68     public static Map<String, String> DME2HeadersMap;
69     public static String routeFilePath;
70
71     public static FileReader routeReader;
72
73     public static FileWriter routeWriter = null;
74     public static Properties prop = null;
75
76     /**
77      * Instantiates MRClientFactory.
78      */
79     private MRClientFactory() {
80         //prevents instantiation.
81     }
82
83     /**
84      * Add getter to avoid direct access to static header map.
85      *
86      * @return
87      */
88     public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
89         return httpHeadersMap;
90     }
91
92     /**
93      * Add setter to avoid direct access to static header map.
94      *
95      * @param headers
96      */
97     public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
98         httpHeadersMap = headers;
99     }
100
101     /**
102      * Create a consumer instance with the default timeout and no limit on
103      * messages returned. This consumer operates as an independent consumer
104      * (i.e., not in a group) and is NOT re-startable across sessions.
105      *
106      * @param hostList A comma separated list of hosts to use to connect to MR. You
107      *                 can include port numbers (3904 is the default). For example,
108      *                 "hostname:8080,"
109      * @param topic    The topic to consume
110      * @return a consumer
111      */
112     public static MRConsumer createConsumer(String hostList, String topic) {
113         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
114     }
115
116     /**
117      * Create a consumer instance with the default timeout and no limit on
118      * messages returned. This consumer operates as an independent consumer
119      * (i.e., not in a group) and is NOT re-startable across sessions.
120      *
121      * @param hostSet The host used in the URL to MR. Entries can be "host:port".
122      * @param topic   The topic to consume
123      * @return a consumer
124      */
125     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
126         return createConsumer(hostSet, topic, null);
127     }
128
129     /**
130      * Create a consumer instance with server-side filtering, the default
131      * timeout, and no limit on messages returned. This consumer operates as an
132      * independent consumer (i.e., not in a group) and is NOT re-startable
133      * across sessions.
134      *
135      * @param hostSet The host used in the URL to MR. Entries can be "host:port".
136      * @param topic   The topic to consume
137      * @param filter  a filter to use on the server side
138      * @return a consumer
139      */
140     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
141         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
142     }
143
144     /**
145      * Create a consumer instance with the default timeout, and no limit on
146      * messages returned. This consumer can operate in a logical group and is
147      * re-startable across sessions when you use the same group and ID on
148      * restart.
149      *
150      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
151      * @param topic         The topic to consume
152      * @param consumerGroup The name of the consumer group this consumer is part of
153      * @param consumerId    The unique id of this consume in its group
154      * @return a consumer
155      */
156     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
157                                             final String consumerId) {
158         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
159     }
160
161     /**
162      * Create a consumer instance with the default timeout, and no limit on
163      * messages returned. This consumer can operate in a logical group and is
164      * re-startable across sessions when you use the same group and ID on
165      * restart.
166      *
167      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
168      * @param topic         The topic to consume
169      * @param consumerGroup The name of the consumer group this consumer is part of
170      * @param consumerId    The unique id of this consume in its group
171      * @param timeoutMs     The amount of time in milliseconds that the server should keep
172      *                      the connection open while waiting for message traffic. Use -1
173      *                      for default timeout.
174      * @param limit         A limit on the number of messages returned in a single call.
175      *                      Use -1 for no limit.
176      * @return a consumer
177      */
178     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
179                                             final String consumerId, int timeoutMs, int limit) {
180         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
181     }
182
183     /**
184      * Create a consumer instance with the default timeout, and no limit on
185      * messages returned. This consumer can operate in a logical group and is
186      * re-startable across sessions when you use the same group and ID on
187      * restart. This consumer also uses server-side filtering.
188      *
189      * @param hostList      A comma separated list of hosts to use to connect to MR. You
190      *                      can include port numbers (3904 is the default)"
191      * @param topic         The topic to consume
192      * @param consumerGroup The name of the consumer group this consumer is part of
193      * @param consumerId    The unique id of this consume in its group
194      * @param timeoutMs     The amount of time in milliseconds that the server should keep
195      *                      the connection open while waiting for message traffic. Use -1
196      *                      for default timeout.
197      * @param limit         A limit on the number of messages returned in a single call.
198      *                      Use -1 for no limit.
199      * @param filter        A Highland Park filter expression using only built-in filter
200      *                      components. Use null for "no filter".
201      * @return a consumer
202      */
203     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
204                                             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
205         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
206                 filter, apiKey, apiSecret);
207     }
208
209     /**
210      * Create a consumer instance with the default timeout, and no limit on
211      * messages returned. This consumer can operate in a logical group and is
212      * re-startable across sessions when you use the same group and ID on
213      * restart. This consumer also uses server-side filtering.
214      *
215      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
216      * @param topic         The topic to consume
217      * @param consumerGroup The name of the consumer group this consumer is part of
218      * @param consumerId    The unique id of this consume in its group
219      * @param timeoutMs     The amount of time in milliseconds that the server should keep
220      *                      the connection open while waiting for message traffic. Use -1
221      *                      for default timeout.
222      * @param limit         A limit on the number of messages returned in a single call.
223      *                      Use -1 for no limit.
224      * @param filter        A Highland Park filter expression using only built-in filter
225      *                      components. Use null for "no filter".
226      * @return a consumer
227      */
228     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
229                                             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
230         if (MRClientBuilders.sfConsumerMock != null) {
231             return MRClientBuilders.sfConsumerMock;
232         }
233         try {
234             return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
235                     .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
236                     .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
237                     .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
238                     .createMRConsumerImpl();
239         } catch (MalformedURLException e) {
240             throw new IllegalArgumentException(e);
241         }
242     }
243
244     //*************************************************************************
245     //*************************************************************************
246     //*************************************************************************
247
248     /**
249      * Create a publisher that sends each message (or group of messages)
250      * immediately. Most applications should favor higher latency for much
251      * higher message throughput and the "simple publisher" is not a good
252      * choice.
253      *
254      * @param hostlist The host used in the URL to MR. Can be "host:port", can be
255      *                 multiple comma-separated entries.
256      * @param topic    The topic on which to publish messages.
257      * @return a publisher
258      */
259     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
260         return createBatchingPublisher(hostlist, topic, 1, 1);
261     }
262
263     /**
264      * Create a publisher that batches messages. Be sure to close the publisher
265      * to send the last batch and ensure a clean shutdown. Message payloads are
266      * not compressed.
267      *
268      * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
269      *                     multiple comma-separated entries.
270      * @param topic        The topic on which to publish messages.
271      * @param maxBatchSize The largest set of messages to batch
272      * @param maxAgeMs     The maximum age of a message waiting in a batch
273      * @return a publisher
274      */
275     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
276                                                               long maxAgeMs) {
277         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
278     }
279
280     /**
281      * Create a publisher that batches messages. Be sure to close the publisher
282      * to send the last batch and ensure a clean shutdown.
283      *
284      * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
285      *                     multiple comma-separated entries.
286      * @param topic        The topic on which to publish messages.
287      * @param maxBatchSize The largest set of messages to batch
288      * @param maxAgeMs     The maximum age of a message waiting in a batch
289      * @param compress     use gzip compression
290      * @return a publisher
291      */
292     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
293                                                               long maxAgeMs, boolean compress) {
294         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
295     }
296
297     /**
298      * Create a publisher that batches messages. Be sure to close the publisher
299      * to send the last batch and ensure a clean shutdown.
300      *
301      * @param hostSet      A set of hosts to be used in the URL to MR. Can be
302      *                     "host:port". Use multiple entries to enable failover.
303      * @param topic        The topic on which to publish messages.
304      * @param maxBatchSize The largest set of messages to batch
305      * @param maxAgeMs     The maximum age of a message waiting in a batch
306      * @param compress     use gzip compression
307      * @return a publisher
308      */
309     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
310                                                               long maxAgeMs, boolean compress) {
311         final TreeSet<String> hosts = new TreeSet<>();
312         Collections.addAll(hosts, hostSet);
313         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
314     }
315
316     /**
317      * Create a publisher that batches messages. Be sure to close the publisher
318      * to send the last batch and ensure a clean shutdown.
319      *
320      * @param hostSet      A set of hosts to be used in the URL to MR. Can be
321      *                     "host:port". Use multiple entries to enable failover.
322      * @param topic        The topic on which to publish messages.
323      * @param maxBatchSize The largest set of messages to batch
324      * @param maxAgeMs     The maximum age of a message waiting in a batch
325      * @param compress     use gzip compression
326      * @return a publisher
327      */
328     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
329                                                               int maxBatchSize, long maxAgeMs, boolean compress) {
330         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
331                 .compress(compress).build();
332     }
333
334     /**
335      * Create a publisher that batches messages. Be sure to close the publisher
336      * to send the last batch and ensure a clean shutdown.
337      *
338      * @param host         A host to be used in the URL to MR. Can be "host:port". Use
339      *                     multiple entries to enable failover.
340      * @param topic        The topic on which to publish messages.
341      * @param username     username
342      * @param password     password
343      * @param maxBatchSize The largest set of messages to batch
344      * @param maxAgeMs     The maximum age of a message waiting in a batch
345      * @param compress     use gzip compression
346      * @param protocolFlag http auth or ueb auth or dme2 method
347      * @return MRBatchingPublisher obj
348      */
349     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
350                                                               final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
351         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
352                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
353                 .compress(compress).build();
354
355         pub.setHost(host);
356         pub.setUsername(username);
357         pub.setPassword(password);
358         pub.setProtocolFlag(protocolFlag);
359         return pub;
360     }
361
362     /**
363      * Create a publisher that batches messages. Be sure to close the publisher
364      * to send the last batch and ensure a clean shutdown
365      *
366      * @param props props set all properties for publishing message
367      * @return MRBatchingPublisher obj
368      * @throws FileNotFoundException exc
369      * @throws IOException           ioex
370      */
371     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
372             throws FileNotFoundException, IOException {
373         return createInternalBatchingPublisher(props, withResponse);
374     }
375
376     /**
377      * Create a publisher that batches messages. Be sure to close the publisher
378      * to send the last batch and ensure a clean shutdown
379      *
380      * @param props props set all properties for publishing message
381      * @return MRBatchingPublisher obj
382      * @throws FileNotFoundException exc
383      * @throws IOException           ioex
384      */
385     public static MRBatchingPublisher createBatchingPublisher(Properties props)
386             throws FileNotFoundException, IOException {
387         return createInternalBatchingPublisher(props, false);
388     }
389
390     /**
391      * Create a publisher that batches messages. Be sure to close the publisher
392      * to send the last batch and ensure a clean shutdown
393      *
394      * @param producerFilePath set all properties for publishing message
395      * @return MRBatchingPublisher obj
396      * @throws FileNotFoundException exc
397      * @throws IOException           ioex
398      */
399     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
400             throws FileNotFoundException, IOException {
401         Properties props = new Properties();
402         try (InputStream input = new FileInputStream(producerFilePath)) {
403             props.load(input);
404         }
405         return createBatchingPublisher(props);
406     }
407
408     /**
409      * Create a publisher that will contain send methods that return response
410      * object to user.
411      *
412      * @param producerFilePath set all properties for publishing message
413      * @return MRBatchingPublisher obj
414      * @throws FileNotFoundException exc
415      * @throws IOException           ioex
416      */
417     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
418             throws FileNotFoundException, IOException {
419         Properties props = new Properties();
420         try (InputStream input = new FileInputStream(producerFilePath)) {
421             props.load(input);
422         }
423         return createBatchingPublisher(props, withResponse);
424     }
425
426     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
427             throws FileNotFoundException, IOException {
428         assert props != null;
429         MRSimplerBatchPublisher pub;
430
431         String messageSentThreadOccurrence = props.getProperty(DmaapClientConst.MESSAGE_SENT_THREAD_OCCURRENCE);
432         if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
433             messageSentThreadOccurrence = props.getProperty(DmaapClientConst.MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
434         }
435
436         if (withResponse) {
437             pub = new MRSimplerBatchPublisher.Builder()
438                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)), MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.SERVICE_NAME)), props.getProperty(DmaapClientConst.TRANSPORT_TYPE))
439                     .onTopic(props.getProperty(DmaapClientConst.TOPIC))
440                     .batchTo(Integer.parseInt(props.getProperty(DmaapClientConst.MAX_BATCH_SIZE)),
441                             Integer.parseInt(props.getProperty(DmaapClientConst.MAX_AGE_MS)))
442                     .compress(Boolean.parseBoolean(props.getProperty(DmaapClientConst.COMPRESS)))
443                     .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
444                     .withResponse(withResponse).build();
445         } else {
446             pub = new MRSimplerBatchPublisher.Builder()
447                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)), MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.SERVICE_NAME)), props.getProperty(DmaapClientConst.TRANSPORT_TYPE))
448                     .onTopic(props.getProperty(DmaapClientConst.TOPIC))
449                     .batchTo(Integer.parseInt(props.getProperty(DmaapClientConst.MAX_BATCH_SIZE)),
450                             Integer.parseInt(props.getProperty(DmaapClientConst.MAX_AGE_MS)))
451                     .compress(Boolean.parseBoolean(props.getProperty(DmaapClientConst.COMPRESS)))
452                     .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
453         }
454         pub.setHost(props.getProperty(DmaapClientConst.HOST));
455         if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
456
457             pub.setAuthKey(props.getProperty(DmaapClientConst.AUTH_KEY));
458             pub.setAuthDate(props.getProperty(DmaapClientConst.AUTH_DATE));
459             pub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
460             pub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
461         } else {
462             pub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
463             pub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
464         }
465         pub.setProtocolFlag(props.getProperty(DmaapClientConst.TRANSPORT_TYPE));
466         pub.setProps(props);
467         prop = new Properties();
468         if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
469             routeFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
470             routeReader = new FileReader(new File(routeFilePath));
471             File fo = new File(routeFilePath);
472             if (!fo.exists()) {
473                 routeWriter = new FileWriter(new File(routeFilePath));
474             }
475         }
476         return pub;
477     }
478
479     /**
480      * Create an identity manager client to work with API keys.
481      *
482      * @param hostSet   A set of hosts to be used in the URL to MR. Can be
483      *                  "host:port". Use multiple entries to enable failover.
484      * @param apiKey    Your API key
485      * @param apiSecret Your API secret
486      * @return an identity manager
487      */
488     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
489         MRIdentityManager cim;
490         try {
491             cim = new MRMetaClient(hostSet);
492         } catch (MalformedURLException e) {
493             throw new IllegalArgumentException(e);
494         }
495         cim.setApiCredentials(apiKey, apiSecret);
496         return cim;
497     }
498
499     /**
500      * Create a topic manager for working with topics.
501      *
502      * @param hostSet   A set of hosts to be used in the URL to MR. Can be
503      *                  "host:port". Use multiple entries to enable failover.
504      * @param apiKey    Your API key
505      * @param apiSecret Your API secret
506      * @return a topic manager
507      */
508     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
509         MRMetaClient tmi;
510         try {
511             tmi = new MRMetaClient(hostSet);
512         } catch (MalformedURLException e) {
513             throw new IllegalArgumentException(e);
514         }
515         tmi.setApiCredentials(apiKey, apiSecret);
516         return tmi;
517     }
518
519     /**
520      * Inject a consumer. Used to support unit tests.
521      *
522      * @param cc
523      */
524     public static void $testInject(MRConsumer cc) {
525         MRClientBuilders.sfConsumerMock = cc;
526     }
527
528     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
529                                             String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
530
531         MRConsumerImpl sub;
532         try {
533             sub = new MRConsumerImpl.MRConsumerImplBuilder()
534                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
535                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
536                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
537                     .createMRConsumerImpl();
538         } catch (MalformedURLException e) {
539             throw new IllegalArgumentException(e);
540         }
541         sub.setUsername(username);
542         sub.setPassword(password);
543         sub.setHost(host);
544         sub.setProtocolFlag(protocalFlag);
545         sub.setConsumerFilePath(consumerFilePath);
546         return sub;
547
548     }
549
550     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
551                                             String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
552
553         MRConsumerImpl sub;
554         try {
555             sub = new MRConsumerImpl.MRConsumerImplBuilder()
556                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
557                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
558                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
559                     .createMRConsumerImpl();
560         } catch (MalformedURLException e) {
561             throw new IllegalArgumentException(e);
562         }
563         sub.setUsername(username);
564         sub.setPassword(password);
565         sub.setHost(host);
566         sub.setProtocolFlag(protocalFlag);
567         sub.setConsumerFilePath(consumerFilePath);
568         return sub;
569
570     }
571
572     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
573         Properties props = new Properties();
574         try (InputStream input = new FileInputStream(consumerFilePath)) {
575             props.load(input);
576         }
577         return createConsumer(props);
578     }
579
580     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
581         int timeout;
582         ValidatorUtil.validateSubscriber(props);
583         if (props.getProperty(DmaapClientConst.TIMEOUT) != null) {
584             timeout = Integer.parseInt(props.getProperty(DmaapClientConst.TIMEOUT));
585         } else {
586             timeout = -1;
587         }
588         int limit;
589         if (props.getProperty(DmaapClientConst.LIMIT) != null) {
590             limit = Integer.parseInt(props.getProperty(DmaapClientConst.LIMIT));
591         } else {
592             limit = -1;
593         }
594         String group;
595         if (props.getProperty(DmaapClientConst.GROUP) == null) {
596             group = UUID.randomUUID().toString();
597         } else {
598             group = props.getProperty(DmaapClientConst.GROUP);
599         }
600         MRConsumerImpl sub = null;
601         if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
602             sub = new MRConsumerImpl.MRConsumerImplBuilder()
603                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)))
604                     .setTopic(props.getProperty(DmaapClientConst.TOPIC)).setConsumerGroup(group)
605                     .setConsumerId(props.getProperty(DmaapClientConst.ID)).setTimeoutMs(timeout).setLimit(limit)
606                     .setFilter(props.getProperty(DmaapClientConst.FILTER))
607                     .setApiKey_username(props.getProperty(DmaapClientConst.AUTH_KEY))
608                     .setApiSecret_password(props.getProperty(DmaapClientConst.AUTH_DATE)).createMRConsumerImpl();
609             sub.setAuthKey(props.getProperty(DmaapClientConst.AUTH_KEY));
610             sub.setAuthDate(props.getProperty(DmaapClientConst.AUTH_DATE));
611             sub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
612             sub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
613         } else {
614             sub = new MRConsumerImpl.MRConsumerImplBuilder()
615                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(DmaapClientConst.HOST)))
616                     .setTopic(props.getProperty(DmaapClientConst.TOPIC)).setConsumerGroup(group)
617                     .setConsumerId(props.getProperty(DmaapClientConst.ID)).setTimeoutMs(timeout).setLimit(limit)
618                     .setFilter(props.getProperty(DmaapClientConst.FILTER))
619                     .setApiKey_username(props.getProperty(DmaapClientConst.USERNAME))
620                     .setApiSecret_password(props.getProperty(DmaapClientConst.PASSWORD)).createMRConsumerImpl();
621             sub.setUsername(props.getProperty(DmaapClientConst.USERNAME));
622             sub.setPassword(props.getProperty(DmaapClientConst.PASSWORD));
623         }
624
625         sub.setProps(props);
626         sub.setHost(props.getProperty(DmaapClientConst.HOST));
627         sub.setProtocolFlag(props.getProperty(DmaapClientConst.TRANSPORT_TYPE));
628         sub.setfFilter(props.getProperty(DmaapClientConst.FILTER));
629         if (props.getProperty(DmaapClientConst.TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
630             MRConsumerImpl.setRouterFilePath(props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH));
631             routeFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
632             routeReader = new FileReader(new File(routeFilePath));
633             prop = new Properties();
634             File fo = new File(routeFilePath);
635             if (!fo.exists()) {
636                 routeWriter = new FileWriter(new File(routeFilePath));
637             }
638         }
639
640         return sub;
641     }
642 }