1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * Modifications Copyright © 2021 Orange.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
26 package org.onap.dmaap.mr.client;
29 import java.io.FileInputStream;
30 import java.io.FileNotFoundException;
31 import java.io.FileReader;
32 import java.io.FileWriter;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.MalformedURLException;
36 import java.util.Collection;
37 import java.util.Collections;
39 import java.util.Properties;
40 import java.util.TreeSet;
41 import java.util.UUID;
42 import javax.ws.rs.core.MultivaluedMap;
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.impl.MRMetaClient;
46 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
47 import org.onap.dmaap.mr.tools.ValidatorUtil;
50 * A factory for MR clients.<br/>
52 * Use caution selecting a consumer creator factory. If the call doesn't accept
53 * a consumer group name, then it creates a consumer that is not restartable.
54 * That is, if you stop your process and start it again, your client will NOT
55 * receive any missed messages on the topic. If you need to ensure receipt of
56 * missed messages, then you must use a consumer that's created with a group
57 * name and ID. (If you create multiple consumer processes using the same group,
58 * load is split across them. Be sure to use a different ID for each
65 public class MRClientFactory {
67 private static final String ID = "id";
68 private static final String AUTH_KEY = "authKey";
69 private static final String AUTH_DATE = "authDate";
70 private static final String PASSWORD = "password";
71 private static final String USERNAME = "username";
72 private static final String FILTER = "filter";
73 private static final String HOST = "host";
74 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
75 private static final String TOPIC = "topic";
76 private static final String TRANSPORT_TYPE = "TransportType";
77 private static final String MAX_BATCH_SIZE = "maxBatchSize";
78 private static final String MAX_AGE_MS = "maxAgeMs";
79 private static final String MESSAGE_SENT_THREAD_OCCURRENCE_OLD = "MessageSentThreadOccurance";
80 private static final String MESSAGE_SENT_THREAD_OCCURRENCE = "MessageSentThreadOccurrence";
81 private static final String GROUP = "group";
82 private static final String SERVICE_NAME = "ServiceName";
83 private static final String PARTNER = "Partner";
84 private static final String ROUTE_OFFER = "routeOffer";
85 private static final String PROTOCOL = "Protocol";
86 private static final String METHOD_TYPE = "MethodType";
87 private static final String CONTENT_TYPE = "contenttype";
88 private static final String LATITUDE = "Latitude";
89 private static final String LONGITUDE = "Longitude";
90 private static final String AFT_ENVIRONMENT = "AFT_ENVIRONMENT";
91 private static final String VERSION = "Version";
92 private static final String ENVIRONMENT = "Environment";
93 private static final String SUB_CONTEXT_PATH = "SubContextPath";
94 private static final String SESSION_STICKINESS_REQUIRED = "sessionstickinessrequired";
95 private static final String PARTITION = "partition";
96 private static final String COMPRESS = "compress";
97 private static final String TIMEOUT = "timeout";
98 private static final String LIMIT = "limit";
99 private static final String AFT_DME2_EP_READ_TIMEOUT_MS = "AFT_DME2_EP_READ_TIMEOUT_MS";
100 private static final String AFT_DME2_ROUNDTRIP_TIMEOUT_MS = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
101 private static final String AFT_DME2_EP_CONN_TIMEOUT = "AFT_DME2_EP_CONN_TIMEOUT";
102 private static final String AFT_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
103 private static final String AFT_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
104 private static final String AFT_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
105 private static final String DME2_PER_HANDLER_TIMEOUT_MS = "DME2_PER_HANDLER_TIMEOUT_MS";
106 private static final String DME2_REPLY_HANDLER_TIMEOUT_MS = "DME2_REPLY_HANDLER_TIMEOUT_MS";
108 private static MultivaluedMap<String, Object> httpHeadersMap;
109 public static Map<String, String> DME2HeadersMap;
110 public static String routeFilePath;
112 public static FileReader routeReader;
114 public static FileWriter routeWriter = null;
115 public static Properties prop = null;
118 * Instantiates MRClientFactory.
120 private MRClientFactory() {
121 //prevents instantiation.
125 * Add getter to avoid direct access to static header map.
129 public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
130 return httpHeadersMap;
134 * Add setter to avoid direct access to static header map.
138 public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
139 httpHeadersMap = headers;
143 * Create a consumer instance with the default timeout and no limit on
144 * messages returned. This consumer operates as an independent consumer
145 * (i.e., not in a group) and is NOT re-startable across sessions.
147 * @param hostList A comma separated list of hosts to use to connect to MR. You
148 * can include port numbers (3904 is the default). For example,
150 * @param topic The topic to consume
153 public static MRConsumer createConsumer(String hostList, String topic) {
154 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
158 * Create a consumer instance with the default timeout and no limit on
159 * messages returned. This consumer operates as an independent consumer
160 * (i.e., not in a group) and is NOT re-startable across sessions.
162 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
163 * @param topic The topic to consume
166 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
167 return createConsumer(hostSet, topic, null);
171 * Create a consumer instance with server-side filtering, the default
172 * timeout, and no limit on messages returned. This consumer operates as an
173 * independent consumer (i.e., not in a group) and is NOT re-startable
176 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
177 * @param topic The topic to consume
178 * @param filter a filter to use on the server side
181 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
182 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
186 * Create a consumer instance with the default timeout, and no limit on
187 * messages returned. This consumer can operate in a logical group and is
188 * re-startable across sessions when you use the same group and ID on
191 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
192 * @param topic The topic to consume
193 * @param consumerGroup The name of the consumer group this consumer is part of
194 * @param consumerId The unique id of this consume in its group
197 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
198 final String consumerId) {
199 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
203 * Create a consumer instance with the default timeout, and no limit on
204 * messages returned. This consumer can operate in a logical group and is
205 * re-startable across sessions when you use the same group and ID on
208 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
209 * @param topic The topic to consume
210 * @param consumerGroup The name of the consumer group this consumer is part of
211 * @param consumerId The unique id of this consume in its group
212 * @param timeoutMs The amount of time in milliseconds that the server should keep
213 * the connection open while waiting for message traffic. Use -1
214 * for default timeout.
215 * @param limit A limit on the number of messages returned in a single call.
216 * Use -1 for no limit.
219 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
220 final String consumerId, int timeoutMs, int limit) {
221 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
225 * Create a consumer instance with the default timeout, and no limit on
226 * messages returned. This consumer can operate in a logical group and is
227 * re-startable across sessions when you use the same group and ID on
228 * restart. This consumer also uses server-side filtering.
230 * @param hostList A comma separated list of hosts to use to connect to MR. You
231 * can include port numbers (3904 is the default)"
232 * @param topic The topic to consume
233 * @param consumerGroup The name of the consumer group this consumer is part of
234 * @param consumerId The unique id of this consume in its group
235 * @param timeoutMs The amount of time in milliseconds that the server should keep
236 * the connection open while waiting for message traffic. Use -1
237 * for default timeout.
238 * @param limit A limit on the number of messages returned in a single call.
239 * Use -1 for no limit.
240 * @param filter A Highland Park filter expression using only built-in filter
241 * components. Use null for "no filter".
244 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
245 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
246 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
247 filter, apiKey, apiSecret);
251 * Create a consumer instance with the default timeout, and no limit on
252 * messages returned. This consumer can operate in a logical group and is
253 * re-startable across sessions when you use the same group and ID on
254 * restart. This consumer also uses server-side filtering.
256 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
257 * @param topic The topic to consume
258 * @param consumerGroup The name of the consumer group this consumer is part of
259 * @param consumerId The unique id of this consume in its group
260 * @param timeoutMs The amount of time in milliseconds that the server should keep
261 * the connection open while waiting for message traffic. Use -1
262 * for default timeout.
263 * @param limit A limit on the number of messages returned in a single call.
264 * Use -1 for no limit.
265 * @param filter A Highland Park filter expression using only built-in filter
266 * components. Use null for "no filter".
269 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
270 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
271 if (MRClientBuilders.sfConsumerMock != null) {
272 return MRClientBuilders.sfConsumerMock;
275 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
276 .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
277 .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
278 .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
279 .createMRConsumerImpl();
280 } catch (MalformedURLException e) {
281 throw new IllegalArgumentException(e);
285 //*************************************************************************
286 //*************************************************************************
287 //*************************************************************************
290 * Create a publisher that sends each message (or group of messages)
291 * immediately. Most applications should favor higher latency for much
292 * higher message throughput and the "simple publisher" is not a good
295 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
296 * multiple comma-separated entries.
297 * @param topic The topic on which to publish messages.
298 * @return a publisher
300 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
301 return createBatchingPublisher(hostlist, topic, 1, 1);
305 * Create a publisher that batches messages. Be sure to close the publisher
306 * to send the last batch and ensure a clean shutdown. Message payloads are
309 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
310 * multiple comma-separated entries.
311 * @param topic The topic on which to publish messages.
312 * @param maxBatchSize The largest set of messages to batch
313 * @param maxAgeMs The maximum age of a message waiting in a batch
314 * @return a publisher
316 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
318 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
322 * Create a publisher that batches messages. Be sure to close the publisher
323 * to send the last batch and ensure a clean shutdown.
325 * @param hostlist The host used in the URL to MR. Can be "host:port", can be
326 * multiple comma-separated entries.
327 * @param topic The topic on which to publish messages.
328 * @param maxBatchSize The largest set of messages to batch
329 * @param maxAgeMs The maximum age of a message waiting in a batch
330 * @param compress use gzip compression
331 * @return a publisher
333 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
334 long maxAgeMs, boolean compress) {
335 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
339 * Create a publisher that batches messages. Be sure to close the publisher
340 * to send the last batch and ensure a clean shutdown.
342 * @param hostSet A set of hosts to be used in the URL to MR. Can be
343 * "host:port". Use multiple entries to enable failover.
344 * @param topic The topic on which to publish messages.
345 * @param maxBatchSize The largest set of messages to batch
346 * @param maxAgeMs The maximum age of a message waiting in a batch
347 * @param compress use gzip compression
348 * @return a publisher
350 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
351 long maxAgeMs, boolean compress) {
352 final TreeSet<String> hosts = new TreeSet<>();
353 Collections.addAll(hosts, hostSet);
354 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
358 * Create a publisher that batches messages. Be sure to close the publisher
359 * to send the last batch and ensure a clean shutdown.
361 * @param hostSet A set of hosts to be used in the URL to MR. Can be
362 * "host:port". Use multiple entries to enable failover.
363 * @param topic The topic on which to publish messages.
364 * @param maxBatchSize The largest set of messages to batch
365 * @param maxAgeMs The maximum age of a message waiting in a batch
366 * @param compress use gzip compression
367 * @return a publisher
369 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
370 int maxBatchSize, long maxAgeMs, boolean compress) {
371 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
372 .compress(compress).build();
376 * Create a publisher that batches messages. Be sure to close the publisher
377 * to send the last batch and ensure a clean shutdown.
379 * @param host A host to be used in the URL to MR. Can be "host:port". Use
380 * multiple entries to enable failover.
381 * @param topic The topic on which to publish messages.
382 * @param username username
383 * @param password password
384 * @param maxBatchSize The largest set of messages to batch
385 * @param maxAgeMs The maximum age of a message waiting in a batch
386 * @param compress use gzip compression
387 * @param protocolFlag http auth or ueb auth or dme2 method
388 * @return MRBatchingPublisher obj
390 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
391 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
392 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
393 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
394 .compress(compress).build();
397 pub.setUsername(username);
398 pub.setPassword(password);
399 pub.setProtocolFlag(protocolFlag);
404 * Create a publisher that batches messages. Be sure to close the publisher
405 * to send the last batch and ensure a clean shutdown
407 * @param props props set all properties for publishing message
408 * @return MRBatchingPublisher obj
409 * @throws FileNotFoundException exc
410 * @throws IOException ioex
412 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
413 throws FileNotFoundException, IOException {
414 return createInternalBatchingPublisher(props, withResponse);
418 * Create a publisher that batches messages. Be sure to close the publisher
419 * to send the last batch and ensure a clean shutdown
421 * @param props props set all properties for publishing message
422 * @return MRBatchingPublisher obj
423 * @throws FileNotFoundException exc
424 * @throws IOException ioex
426 public static MRBatchingPublisher createBatchingPublisher(Properties props)
427 throws FileNotFoundException, IOException {
428 return createInternalBatchingPublisher(props, false);
432 * Create a publisher that batches messages. Be sure to close the publisher
433 * to send the last batch and ensure a clean shutdown
435 * @param producerFilePath set all properties for publishing message
436 * @return MRBatchingPublisher obj
437 * @throws FileNotFoundException exc
438 * @throws IOException ioex
440 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
441 throws FileNotFoundException, IOException {
442 Properties props = new Properties();
443 try (InputStream input = new FileInputStream(producerFilePath)) {
446 return createBatchingPublisher(props);
450 * Create a publisher that will contain send methods that return response
453 * @param producerFilePath set all properties for publishing message
454 * @return MRBatchingPublisher obj
455 * @throws FileNotFoundException exc
456 * @throws IOException ioex
458 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
459 throws FileNotFoundException, IOException {
460 Properties props = new Properties();
461 try (InputStream input = new FileInputStream(producerFilePath)) {
464 return createBatchingPublisher(props, withResponse);
467 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
468 throws FileNotFoundException, IOException {
469 assert props != null;
470 MRSimplerBatchPublisher pub;
472 String messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE);
473 if (messageSentThreadOccurrence == null || messageSentThreadOccurrence.isEmpty()) {
474 messageSentThreadOccurrence = props.getProperty(MESSAGE_SENT_THREAD_OCCURRENCE_OLD);
478 pub = new MRSimplerBatchPublisher.Builder()
479 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
480 .onTopic(props.getProperty(TOPIC))
481 .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
482 Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
483 .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
484 .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence))
485 .withResponse(withResponse).build();
487 pub = new MRSimplerBatchPublisher.Builder()
488 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty(SERVICE_NAME)), props.getProperty(TRANSPORT_TYPE))
489 .onTopic(props.getProperty(TOPIC))
490 .batchTo(Integer.parseInt(props.getProperty(MAX_BATCH_SIZE)),
491 Integer.parseInt(props.getProperty(MAX_AGE_MS).toString()))
492 .compress(Boolean.parseBoolean(props.getProperty(COMPRESS)))
493 .httpThreadTime(Integer.parseInt(messageSentThreadOccurrence)).build();
495 pub.setHost(props.getProperty(HOST));
496 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
498 pub.setAuthKey(props.getProperty(AUTH_KEY));
499 pub.setAuthDate(props.getProperty(AUTH_DATE));
500 pub.setUsername(props.getProperty(USERNAME));
501 pub.setPassword(props.getProperty(PASSWORD));
503 pub.setUsername(props.getProperty(USERNAME));
504 pub.setPassword(props.getProperty(PASSWORD));
506 pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
508 prop = new Properties();
509 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
510 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
511 routeReader = new FileReader(new File(routeFilePath));
512 File fo = new File(routeFilePath);
514 routeWriter = new FileWriter(new File(routeFilePath));
521 * Create an identity manager client to work with API keys.
523 * @param hostSet A set of hosts to be used in the URL to MR. Can be
524 * "host:port". Use multiple entries to enable failover.
525 * @param apiKey Your API key
526 * @param apiSecret Your API secret
527 * @return an identity manager
529 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
530 MRIdentityManager cim;
532 cim = new MRMetaClient(hostSet);
533 } catch (MalformedURLException e) {
534 throw new IllegalArgumentException(e);
536 cim.setApiCredentials(apiKey, apiSecret);
541 * Create a topic manager for working with topics.
543 * @param hostSet A set of hosts to be used in the URL to MR. Can be
544 * "host:port". Use multiple entries to enable failover.
545 * @param apiKey Your API key
546 * @param apiSecret Your API secret
547 * @return a topic manager
549 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
552 tmi = new MRMetaClient(hostSet);
553 } catch (MalformedURLException e) {
554 throw new IllegalArgumentException(e);
556 tmi.setApiCredentials(apiKey, apiSecret);
561 * Inject a consumer. Used to support unit tests.
565 public static void $testInject(MRConsumer cc) {
566 MRClientBuilders.sfConsumerMock = cc;
569 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
570 String id, int timeout, int limit, String protocalFlag, String consumerFilePath) {
574 sub = new MRConsumerImpl.MRConsumerImplBuilder()
575 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
576 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
577 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
578 .createMRConsumerImpl();
579 } catch (MalformedURLException e) {
580 throw new IllegalArgumentException(e);
582 sub.setUsername(username);
583 sub.setPassword(password);
585 sub.setProtocolFlag(protocalFlag);
586 sub.setConsumerFilePath(consumerFilePath);
591 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
592 String id, String protocalFlag, String consumerFilePath, int timeout, int limit) {
596 sub = new MRConsumerImpl.MRConsumerImplBuilder()
597 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
598 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(timeout).setLimit(limit)
599 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
600 .createMRConsumerImpl();
601 } catch (MalformedURLException e) {
602 throw new IllegalArgumentException(e);
604 sub.setUsername(username);
605 sub.setPassword(password);
607 sub.setProtocolFlag(protocalFlag);
608 sub.setConsumerFilePath(consumerFilePath);
613 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
614 Properties props = new Properties();
615 try (InputStream input = new FileInputStream(consumerFilePath)) {
618 return createConsumer(props);
621 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
623 ValidatorUtil.validateSubscriber(props);
624 if (props.getProperty(TIMEOUT) != null) {
625 timeout = Integer.parseInt(props.getProperty(TIMEOUT));
630 if (props.getProperty(LIMIT) != null) {
631 limit = Integer.parseInt(props.getProperty(LIMIT));
636 if (props.getProperty(GROUP) == null) {
637 group = UUID.randomUUID().toString();
639 group = props.getProperty(GROUP);
641 MRConsumerImpl sub = null;
642 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.AUTH_KEY.getValue())) {
643 sub = new MRConsumerImpl.MRConsumerImplBuilder()
644 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
645 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
646 .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
647 .setFilter(props.getProperty(FILTER))
648 .setApiKey_username(props.getProperty(AUTH_KEY))
649 .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
650 sub.setAuthKey(props.getProperty(AUTH_KEY));
651 sub.setAuthDate(props.getProperty(AUTH_DATE));
652 sub.setUsername(props.getProperty(USERNAME));
653 sub.setPassword(props.getProperty(PASSWORD));
655 sub = new MRConsumerImpl.MRConsumerImplBuilder()
656 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
657 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
658 .setConsumerId(props.getProperty(ID)).setTimeoutMs(timeout).setLimit(limit)
659 .setFilter(props.getProperty(FILTER))
660 .setApiKey_username(props.getProperty(USERNAME))
661 .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
662 sub.setUsername(props.getProperty(USERNAME));
663 sub.setPassword(props.getProperty(PASSWORD));
667 sub.setHost(props.getProperty(HOST));
668 sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
669 sub.setfFilter(props.getProperty(FILTER));
670 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolType.DME2.getValue())) {
671 MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
672 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
673 routeReader = new FileReader(new File(routeFilePath));
674 prop = new Properties();
675 File fo = new File(routeFilePath);
677 routeWriter = new FileWriter(new File(routeFilePath));