[DMAAP-CLIENT] First sonar issues review part2
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  *   Modifications Copyright © 2021 Orange.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *
24  *******************************************************************************/
25
26 package org.onap.dmaap.mr.client;
27
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.FileNotFoundException;
31 import java.io.FileReader;
32 import java.io.FileWriter;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.MalformedURLException;
36 import java.util.Collection;
37 import java.util.Collections;
38 import java.util.Map;
39 import java.util.Properties;
40 import java.util.TreeSet;
41 import java.util.UUID;
42 import javax.ws.rs.core.MultivaluedMap;
43
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.impl.MRMetaClient;
46 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
47 import org.onap.dmaap.mr.tools.ValidatorUtil;
48
49 /**
50  * A factory for MR clients.<br/>
51  * <br/>
52  * Use caution selecting a consumer creator factory. If the call doesn't accept
53  * a consumer group name, then it creates a consumer that is not restartable.
54  * That is, if you stop your process and start it again, your client will NOT
55  * receive any missed messages on the topic. If you need to ensure receipt of
56  * missed messages, then you must use a consumer that's created with a group
57  * name and ID. (If you create multiple consumer processes using the same group,
58  * load is split across them. Be sure to use a different ID for each
59  * instance.)<br/>
60  * <br/>
61  * Publishers
62  *
63  * @author author
64  */
65 public class MRClientFactory {
66
67     private static final String ID = "id";
68     private static final String AUTH_KEY = "authKey";
69     private static final String AUTH_DATE = "authDate";
70     private static final String PASSWORD = "password";
71     private static final String USERNAME = "username";
72     private static final String FILTER = "filter";
73     private static final String HOST = "host";
74     private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
75     private static final String TOPIC = "topic";
76     private static final String TRANSPORT_TYPE = "TransportType";
77     private static final String MAX_BATCH_SIZE = "maxBatchSize";
78     private static final String MAX_AGE_MS = "maxAgeMs";
79     private static final String MESSAGE_SENT_THREAD_OCCURRENCE_OLD = "MessageSentThreadOccurance";
80     private static final String MESSAGE_SENT_THREAD_OCCURRENCE = "MessageSentThreadOccurrence";
81     private static final String GROUP = "group";
82     private static final String SERVICE_NAME = "ServiceName";
83     private static final String PARTNER = "Partner";
84     private static final String ROUTE_OFFER = "routeOffer";
85     private static final String PROTOCOL = "Protocol";
86     private static final String METHOD_TYPE = "MethodType";
87     private static final String CONTENT_TYPE = "contenttype";
88     private static final String LATITUDE = "Latitude";
89     private static final String LONGITUDE = "Longitude";
90     private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
91     private static final String VERSION = "Version";
92     private static final String ENVIRONMENT = "Environment";
93     private static final String SUB_CONTEXT_PATH = "SubContextPath";
94     private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
95     private static final String PARTITION = "partition";
96     private static final String COMPRESS = "compress";
97     private static final String TIMEOUT = "timeout";
98     private static final String LIMIT = "limit";
99     private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
100     private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
101     private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
102     private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
103     private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
104     private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
105     private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
106     private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
107
108     private static MultivaluedMap<String, Object> httpHeadersMap;
109     public static Map<String, String> DME2HeadersMap;
110     public static String routeFilePath;
111
112     public static FileReader routeReader;
113
114     public static FileWriter routeWriter = null;
115     public static Properties prop = null;
116
117     /**
118      * Instantiates MRClientFactory.
119      */
120     private MRClientFactory() {
121         //prevents instantiation.
122     }
123
124     /**
125      * Add getter to avoid direct access to static header map.
126      *
127      * @return
128      */
129     public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
130         return httpHeadersMap;
131     }
132
133     /**
134      * Add setter to avoid direct access to static header map.
135      *
136      * @param headers
137      */
138     public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
139         httpHeadersMap = headers;
140     }
141
142     /**
143      * Create a consumer instance with the default timeout and no limit on
144      * messages returned. This consumer operates as an independent consumer
145      * (i.e., not in a group) and is NOT re-startable across sessions.
146      *
147      * @param hostList A comma separated list of hosts to use to connect to MR. You
148      *                 can include port numbers (3904 is the default). For example,
149      *                 "hostname:8080,"
150      * @param topic    The topic to consume
151      * @return a consumer
152      */
153     public static MRConsumer createConsumer(String hostList, String topic) {
154         return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
155     }
156
157     /**
158      * Create a consumer instance with the default timeout and no limit on
159      * messages returned. This consumer operates as an independent consumer
160      * (i.e., not in a group) and is NOT re-startable across sessions.
161      *
162      * @param hostSet The host used in the URL to MR. Entries can be "host:port".
163      * @param topic   The topic to consume
164      * @return a consumer
165      */
166     public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
167         return createConsumer(hostSet, topic, null);
168     }
169
170     /**
171      * Create a consumer instance with server-side filtering, the default
172      * timeout, and no limit on messages returned. This consumer operates as an
173      * independent consumer (i.e., not in a group) and is NOT re-startable
174      * across sessions.
175      *
176      * @param hostSet The host used in the URL to MR. Entries can be "host:port".
177      * @param topic   The topic to consume
178      * @param filter  a filter to use on the server side
179      * @return a consumer
180      */
181     public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
182         return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
183     }
184
185     /**
186      * Create a consumer instance with the default timeout, and no limit on
187      * messages returned. This consumer can operate in a logical group and is
188      * re-startable across sessions when you use the same group and ID on
189      * restart.
190      *
191      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
192      * @param topic         The topic to consume
193      * @param consumerGroup The name of the consumer group this consumer is part of
194      * @param consumerId    The unique id of this consume in its group
195      * @return a consumer
196      */
197     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
198                                             final String consumerId) {
199         return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
200     }
201
202     /**
203      * Create a consumer instance with the default timeout, and no limit on
204      * messages returned. This consumer can operate in a logical group and is
205      * re-startable across sessions when you use the same group and ID on
206      * restart.
207      *
208      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
209      * @param topic         The topic to consume
210      * @param consumerGroup The name of the consumer group this consumer is part of
211      * @param consumerId    The unique id of this consume in its group
212      * @param timeoutMs     The amount of time in milliseconds that the server should keep
213      *                      the connection open while waiting for message traffic. Use -1
214      *                      for default timeout.
215      * @param limit         A limit on the number of messages returned in a single call.
216      *                      Use -1 for no limit.
217      * @return a consumer
218      */
219     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
220                                             final String consumerId, int timeoutMs, int limit) {
221         return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
222     }
223
224     /**
225      * Create a consumer instance with the default timeout, and no limit on
226      * messages returned. This consumer can operate in a logical group and is
227      * re-startable across sessions when you use the same group and ID on
228      * restart. This consumer also uses server-side filtering.
229      *
230      * @param hostList      A comma separated list of hosts to use to connect to MR. You
231      *                      can include port numbers (3904 is the default)"
232      * @param topic         The topic to consume
233      * @param consumerGroup The name of the consumer group this consumer is part of
234      * @param consumerId    The unique id of this consume in its group
235      * @param timeoutMs     The amount of time in milliseconds that the server should keep
236      *                      the connection open while waiting for message traffic. Use -1
237      *                      for default timeout.
238      * @param limit         A limit on the number of messages returned in a single call.
239      *                      Use -1 for no limit.
240      * @param filter        A Highland Park filter expression using only built-in filter
241      *                      components. Use null for "no filter".
242      * @return a consumer
243      */
244     public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
245                                             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
246         return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
247                 filter, apiKey, apiSecret);
248     }
249
250     /**
251      * Create a consumer instance with the default timeout, and no limit on
252      * messages returned. This consumer can operate in a logical group and is
253      * re-startable across sessions when you use the same group and ID on
254      * restart. This consumer also uses server-side filtering.
255      *
256      * @param hostSet       The host used in the URL to MR. Entries can be "host:port".
257      * @param topic         The topic to consume
258      * @param consumerGroup The name of the consumer group this consumer is part of
259      * @param consumerId    The unique id of this consume in its group
260      * @param timeoutMs     The amount of time in milliseconds that the server should keep
261      *                      the connection open while waiting for message traffic. Use -1
262      *                      for default timeout.
263      * @param limit         A limit on the number of messages returned in a single call.
264      *                      Use -1 for no limit.
265      * @param filter        A Highland Park filter expression using only built-in filter
266      *                      components. Use null for "no filter".
267      * @return a consumer
268      */
269     public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
270                                             final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
271         if (MRClientBuilders.sfConsumerMock != null) {
272             return MRClientBuilders.sfConsumerMock;
273         }
274         try {
275             return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
276                     .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
277                     .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
278                     .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
279                     .createMRConsumerImpl();
280         } catch (MalformedURLException e) {
281             throw new IllegalArgumentException(e);
282         }
283     }
284
285     //*************************************************************************
286     //*************************************************************************
287     //*************************************************************************
288
289     /**
290      * Create a publisher that sends each message (or group of messages)
291      * immediately. Most applications should favor higher latency for much
292      * higher message throughput and the "simple publisher" is not a good
293      * choice.
294      *
295      * @param hostlist The host used in the URL to MR. Can be "host:port", can be
296      *                 multiple comma-separated entries.
297      * @param topic    The topic on which to publish messages.
298      * @return a publisher
299      */
300     public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
301         return createBatchingPublisher(hostlist, topic, 1, 1);
302     }
303
304     /**
305      * Create a publisher that batches messages. Be sure to close the publisher
306      * to send the last batch and ensure a clean shutdown. Message payloads are
307      * not compressed.
308      *
309      * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
310      *                     multiple comma-separated entries.
311      * @param topic        The topic on which to publish messages.
312      * @param maxBatchSize The largest set of messages to batch
313      * @param maxAgeMs     The maximum age of a message waiting in a batch
314      * @return a publisher
315      */
316     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
317                                                               long maxAgeMs) {
318         return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
319     }
320
321     /**
322      * Create a publisher that batches messages. Be sure to close the publisher
323      * to send the last batch and ensure a clean shutdown.
324      *
325      * @param hostlist     The host used in the URL to MR. Can be "host:port", can be
326      *                     multiple comma-separated entries.
327      * @param topic        The topic on which to publish messages.
328      * @param maxBatchSize The largest set of messages to batch
329      * @param maxAgeMs     The maximum age of a message waiting in a batch
330      * @param compress     use gzip compression
331      * @return a publisher
332      */
333     public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
334                                                               long maxAgeMs, boolean compress) {
335         return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
336     }
337
338     /**
339      * Create a publisher that batches messages. Be sure to close the publisher
340      * to send the last batch and ensure a clean shutdown.
341      *
342      * @param hostSet      A set of hosts to be used in the URL to MR. Can be
343      *                     "host:port". Use multiple entries to enable failover.
344      * @param topic        The topic on which to publish messages.
345      * @param maxBatchSize The largest set of messages to batch
346      * @param maxAgeMs     The maximum age of a message waiting in a batch
347      * @param compress     use gzip compression
348      * @return a publisher
349      */
350     public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
351                                                               long maxAgeMs, boolean compress) {
352         final TreeSet<String> hosts = new TreeSet<>();
353         Collections.addAll(hosts, hostSet);
354         return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
355     }
356
357     /**
358      * Create a publisher that batches messages. Be sure to close the publisher
359      * to send the last batch and ensure a clean shutdown.
360      *
361      * @param hostSet      A set of hosts to be used in the URL to MR. Can be
362      *                     "host:port". Use multiple entries to enable failover.
363      * @param topic        The topic on which to publish messages.
364      * @param maxBatchSize The largest set of messages to batch
365      * @param maxAgeMs     The maximum age of a message waiting in a batch
366      * @param compress     use gzip compression
367      * @return a publisher
368      */
369     public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
370                                                               int maxBatchSize, long maxAgeMs, boolean compress) {
371         return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
372                 .compress(compress).build();
373     }
374
375     /**
376      * Create a publisher that batches messages. Be sure to close the publisher
377      * to send the last batch and ensure a clean shutdown.
378      *
379      * @param host         A host to be used in the URL to MR. Can be "host:port". Use
380      *                     multiple entries to enable failover.
381      * @param topic        The topic on which to publish messages.
382      * @param username     username
383      * @param password     password
384      * @param maxBatchSize The largest set of messages to batch
385      * @param maxAgeMs     The maximum age of a message waiting in a batch
386      * @param compress     use gzip compression
387      * @param protocolFlag http auth or ueb auth or dme2 method
388      * @return MRBatchingPublisher obj
389      */
390     public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
391                                                               final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
392         MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
393                 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
394                 .compress(compress).build();
395
396         pub.setHost(host);
397         pub.setUsername(username);
398         pub.setPassword(password);
399         pub.setProtocolFlag(protocolFlag);
400         return pub;
401     }
402
403     /**
404      * Create a publisher that batches messages. Be sure to close the publisher
405      * to send the last batch and ensure a clean shutdown
406      *
407      * @param props props set all properties for publishing message
408      * @return MRBatchingPublisher obj
409      * @throws FileNotFoundException exc
410      * @throws IOException           ioex
411      */
412     public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
413             throws FileNotFoundException, IOException {
414         return createInternalBatchingPublisher(props, withResponse);
415     }
416
417     /**
418      * Create a publisher that batches messages. Be sure to close the publisher
419      * to send the last batch and ensure a clean shutdown
420      *
421      * @param props props set all properties for publishing message
422      * @return MRBatchingPublisher obj
423      * @throws FileNotFoundException exc
424      * @throws IOException           ioex
425      */
426     public static MRBatchingPublisher createBatchingPublisher(Properties props)
427             throws FileNotFoundException, IOException {
428         return createInternalBatchingPublisher(props, false);
429     }
430
431     /**
432      * Create a publisher that batches messages. Be sure to close the publisher
433      * to send the last batch and ensure a clean shutdown
434      *
435      * @param producerFilePath set all properties for publishing message
436      * @return MRBatchingPublisher obj
437      * @throws FileNotFoundException exc
438      * @throws IOException           ioex
439      */
440     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
441             throws FileNotFoundException, IOException {
442         Properties props = new Properties();
443         try (InputStream input = new FileInputStream(producerFilePath)) {
444             props.load(input);
445         }
446         return createBatchingPublisher(props);
447     }
448
449     /**
450      * Create a publisher that will contain send methods that return response
451      * object to user.
452      *
453      * @param producerFilePath set all properties for publishing message
454      * @return MRBatchingPublisher obj
455      * @throws FileNotFoundException exc
456      * @throws IOException           ioex
457      */
458     public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
459             throws FileNotFoundException, IOException {
460         Properties props = new Properties();
461         try (InputStream input = new FileInputStream(producerFilePath)) {
462             props.load(input);
463         }
464         return createBatchingPublisher(props, withResponse);
465     }
466
467     protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
468             throws FileNotFoundException, IOException {
469         assert props != null;
470         MRSimplerBatchPublisher pub;
471
472         String messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE);
473         if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
474             messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
475         }
476
477         if (withResponse) {
478             pub = new MRSimplerBatchPublisher.Builder()
479                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
480                     .onTopic(props.getProperty(TOPIC))
481                     .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
482                             Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
483                     .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
484                     .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
485                     .withResponse(withResponse).build();
486         } else {
487             pub = new MRSimplerBatchPublisher.Builder()
488                     .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
489                     .onTopic(props.getProperty(TOPIC))
490                     .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
491                             Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
492                     .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
493                     .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
494         }
495         pub.setHost(props.getProperty(HOST));
496         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
497
498             pub.setAuthKey(props.getProperty(AUTH_KEY));
499             pub.setAuthDate(props.getProperty(AUTH_DATE));
500             pub.setUsername(props.getProperty(USERNAME));
501             pub.setPassword(props.getProperty(PASSWORD));
502         } else {
503             pub.setUsername(props.getProperty(USERNAME));
504             pub.setPassword(props.getProperty(PASSWORD));
505         }
506         pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
507         pub.setProps(props);
508         prop = new Properties();
509         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
510             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
511             routeReader = new FileReader(new File(routeFilePath));
512             File fo = new File(routeFilePath);
513             if (!fo.exists()) {
514                 routeWriter = new FileWriter(new File(routeFilePath));
515             }
516         }
517         return pub;
518     }
519
520     /**
521      * Create an identity manager client to work with API keys.
522      *
523      * @param hostSet   A set of hosts to be used in the URL to MR. Can be
524      *                  "host:port". Use multiple entries to enable failover.
525      * @param apiKey    Your API key
526      * @param apiSecret Your API secret
527      * @return an identity manager
528      */
529     public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
530         MRIdentityManager cim;
531         try {
532             cim = new MRMetaClient(hostSet);
533         } catch (MalformedURLException e) {
534             throw new IllegalArgumentException(e);
535         }
536         cim.setApiCredentials(apiKey, apiSecret);
537         return cim;
538     }
539
540     /**
541      * Create a topic manager for working with topics.
542      *
543      * @param hostSet   A set of hosts to be used in the URL to MR. Can be
544      *                  "host:port". Use multiple entries to enable failover.
545      * @param apiKey    Your API key
546      * @param apiSecret Your API secret
547      * @return a topic manager
548      */
549     public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
550         MRMetaClient tmi;
551         try {
552             tmi = new MRMetaClient(hostSet);
553         } catch (MalformedURLException e) {
554             throw new IllegalArgumentException(e);
555         }
556         tmi.setApiCredentials(apiKey, apiSecret);
557         return tmi;
558     }
559
560     /**
561      * Inject a consumer. Used to support unit tests.
562      *
563      * @param cc
564      */
565     public static void $testInject(MRConsumer cc) {
566         MRClientBuilders.sfConsumerMock = cc;
567     }
568
569     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
570                                             String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
571
572         MRConsumerImpl sub;
573         try {
574             sub = new MRConsumerImpl.MRConsumerImplBuilder()
575                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
576                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
577                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
578                     .createMRConsumerImpl();
579         } catch (MalformedURLException e) {
580             throw new IllegalArgumentException(e);
581         }
582         sub.setUsername(username);
583         sub.setPassword(password);
584         sub.setHost(host);
585         sub.setProtocolFlag(protocalFlag);
586         sub.setConsumerFilePath(consumerFilePath);
587         return sub;
588
589     }
590
591     public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
592                                             String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
593
594         MRConsumerImpl sub;
595         try {
596             sub = new MRConsumerImpl.MRConsumerImplBuilder()
597                     .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
598                     .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
599                     .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
600                     .createMRConsumerImpl();
601         } catch (MalformedURLException e) {
602             throw new IllegalArgumentException(e);
603         }
604         sub.setUsername(username);
605         sub.setPassword(password);
606         sub.setHost(host);
607         sub.setProtocolFlag(protocalFlag);
608         sub.setConsumerFilePath(consumerFilePath);
609         return sub;
610
611     }
612
613     public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
614         Properties props = new Properties();
615         try (InputStream input = new FileInputStream(consumerFilePath)) {
616             props.load(input);
617         }
618         return createConsumer(props);
619     }
620
621     public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
622         int timeout;
623         ValidatorUtil.validateSubscriber(props);
624         if (props.getProperty(TIMEOUT) != null) {
625             timeout = Integer.parseInt(props.getProperty(TIMEOUT));
626         } else {
627             timeout = -1;
628         }
629         int limit;
630         if (props.getProperty(LIMIT) != null) {
631             limit = Integer.parseInt(props.getProperty(LIMIT));
632         } else {
633             limit = -1;
634         }
635         String group;
636         if (props.getProperty(GROUP) == null) {
637             group = UUID.randomUUID().toString();
638         } else {
639             group = props.getProperty(GROUP);
640         }
641         MRConsumerImpl sub = null;
642         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
643             sub = new MRConsumerImpl.MRConsumerImplBuilder()
644                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
645                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
646                     .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
647                     .setFilter(props.getProperty(FILTER))
648                     .setApiKey_username(props.getProperty(AUTH_KEY))
649                     .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
650             sub.setAuthKey(props.getProperty(AUTH_KEY));
651             sub.setAuthDate(props.getProperty(AUTH_DATE));
652             sub.setUsername(props.getProperty(USERNAME));
653             sub.setPassword(props.getProperty(PASSWORD));
654         } else {
655             sub = new MRConsumerImpl.MRConsumerImplBuilder()
656                     .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
657                     .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
658                     .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
659                     .setFilter(props.getProperty(FILTER))
660                     .setApiKey_username(props.getProperty(USERNAME))
661                     .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
662             sub.setUsername(props.getProperty(USERNAME));
663             sub.setPassword(props.getProperty(PASSWORD));
664         }
665
666         sub.setProps(props);
667         sub.setHost(props.getProperty(HOST));
668         sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
669         sub.setfFilter(props.getProperty(FILTER));
670         if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
671             MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
672             routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
673             routeReader = new FileReader(new File(routeFilePath));
674             prop = new Properties();
675             File fo = new File(routeFilePath);
676             if (!fo.exists()) {
677                 routeWriter = new FileWriter(new File(routeFilePath));
678             }
679         }
680
681         return sub;
682     }
683 }