1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
36 import javax.ws.rs.core.MultivaluedMap;
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
44 * A factory for MR clients.<br/>
46 * Use caution selecting a consumer creator factory. If the call doesn't accept
47 * a consumer group name, then it creates a consumer that is not restartable.
48 * That is, if you stop your process and start it again, your client will NOT
49 * receive any missed messages on the topic. If you need to ensure receipt of
50 * missed messages, then you must use a consumer that's created with a group
51 * name and ID. (If you create multiple consumer processes using the same group,
52 * load is split across them. Be sure to use a different ID for each
59 public class MRClientFactory {
60 public static MultivaluedMap<String, Object> HTTPHeadersMap;
61 public static Map<String, String> DME2HeadersMap;
62 public static String routeFilePath;
64 public static FileReader routeReader;
66 public static FileWriter routeWriter = null;
67 public static Properties prop = null;
70 // props= new Properties();
72 * Create a consumer instance with the default timeout and no limit on
73 * messages returned. This consumer operates as an independent consumer
74 * (i.e., not in a group) and is NOT re-startable across sessions.
77 * A comma separated list of hosts to use to connect to MR. You
78 * can include port numbers (3904 is the default). For example,
82 * The topic to consume
86 public static MRConsumer createConsumer(String hostList, String topic) {
87 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
91 * Create a consumer instance with the default timeout and no limit on
92 * messages returned. This consumer operates as an independent consumer
93 * (i.e., not in a group) and is NOT re-startable across sessions.
96 * The host used in the URL to MR. Entries can be "host:port".
98 * The topic to consume
102 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
103 return createConsumer(hostSet, topic, null);
107 * Create a consumer instance with server-side filtering, the default
108 * timeout, and no limit on messages returned. This consumer operates as an
109 * independent consumer (i.e., not in a group) and is NOT re-startable
113 * The host used in the URL to MR. Entries can be "host:port".
115 * The topic to consume
117 * a filter to use on the server side
121 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
122 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
126 * Create a consumer instance with the default timeout, and no limit on
127 * messages returned. This consumer can operate in a logical group and is
128 * re-startable across sessions when you use the same group and ID on
132 * The host used in the URL to MR. Entries can be "host:port".
134 * The topic to consume
135 * @param consumerGroup
136 * The name of the consumer group this consumer is part of
138 * The unique id of this consume in its group
142 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
143 final String consumerId) {
144 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
148 * Create a consumer instance with the default timeout, and no limit on
149 * messages returned. This consumer can operate in a logical group and is
150 * re-startable across sessions when you use the same group and ID on
154 * The host used in the URL to MR. Entries can be "host:port".
156 * The topic to consume
157 * @param consumerGroup
158 * The name of the consumer group this consumer is part of
160 * The unique id of this consume in its group
162 * The amount of time in milliseconds that the server should keep
163 * the connection open while waiting for message traffic. Use -1
164 * for default timeout.
166 * A limit on the number of messages returned in a single call.
167 * Use -1 for no limit.
171 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
172 final String consumerId, int timeoutMs, int limit) {
173 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
177 * Create a consumer instance with the default timeout, and no limit on
178 * messages returned. This consumer can operate in a logical group and is
179 * re-startable across sessions when you use the same group and ID on
180 * restart. This consumer also uses server-side filtering.
183 * A comma separated list of hosts to use to connect to MR. You
184 * can include port numbers (3904 is the default). For example,
185 * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
187 * The topic to consume
188 * @param consumerGroup
189 * The name of the consumer group this consumer is part of
191 * The unique id of this consume in its group
193 * The amount of time in milliseconds that the server should keep
194 * the connection open while waiting for message traffic. Use -1
195 * for default timeout.
197 * A limit on the number of messages returned in a single call.
198 * Use -1 for no limit.
200 * A Highland Park filter expression using only built-in filter
201 * components. Use null for "no filter".
205 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
206 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
207 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
208 filter, apiKey, apiSecret);
212 * Create a consumer instance with the default timeout, and no limit on
213 * messages returned. This consumer can operate in a logical group and is
214 * re-startable across sessions when you use the same group and ID on
215 * restart. This consumer also uses server-side filtering.
218 * The host used in the URL to MR. Entries can be "host:port".
220 * The topic to consume
221 * @param consumerGroup
222 * The name of the consumer group this consumer is part of
224 * The unique id of this consume in its group
226 * The amount of time in milliseconds that the server should keep
227 * the connection open while waiting for message traffic. Use -1
228 * for default timeout.
230 * A limit on the number of messages returned in a single call.
231 * Use -1 for no limit.
233 * A Highland Park filter expression using only built-in filter
234 * components. Use null for "no filter".
238 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
239 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
240 if (MRClientBuilders.sfConsumerMock != null)
241 return MRClientBuilders.sfConsumerMock;
243 return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
245 } catch (MalformedURLException e) {
246 throw new IllegalArgumentException(e);
250 /*************************************************************************/
251 /*************************************************************************/
252 /*************************************************************************/
255 * Create a publisher that sends each message (or group of messages)
256 * immediately. Most applications should favor higher latency for much
257 * higher message throughput and the "simple publisher" is not a good
261 * The host used in the URL to MR. Can be "host:port", can be
262 * multiple comma-separated entries.
264 * The topic on which to publish messages.
265 * @return a publisher
267 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
268 return createBatchingPublisher(hostlist, topic, 1, 1);
272 * Create a publisher that batches messages. Be sure to close the publisher
273 * to send the last batch and ensure a clean shutdown. Message payloads are
277 * The host used in the URL to MR. Can be "host:port", can be
278 * multiple comma-separated entries.
280 * The topic on which to publish messages.
281 * @param maxBatchSize
282 * The largest set of messages to batch
284 * The maximum age of a message waiting in a batch
286 * @return a publisher
288 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
290 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
294 * Create a publisher that batches messages. Be sure to close the publisher
295 * to send the last batch and ensure a clean shutdown.
298 * The host used in the URL to MR. Can be "host:port", can be
299 * multiple comma-separated entries.
301 * The topic on which to publish messages.
302 * @param maxBatchSize
303 * The largest set of messages to batch
305 * The maximum age of a message waiting in a batch
307 * use gzip compression
309 * @return a publisher
311 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
312 long maxAgeMs, boolean compress) {
313 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
317 * Create a publisher that batches messages. Be sure to close the publisher
318 * to send the last batch and ensure a clean shutdown.
321 * A set of hosts to be used in the URL to MR. Can be
322 * "host:port". Use multiple entries to enable failover.
324 * The topic on which to publish messages.
325 * @param maxBatchSize
326 * The largest set of messages to batch
328 * The maximum age of a message waiting in a batch
330 * use gzip compression
332 * @return a publisher
334 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
335 long maxAgeMs, boolean compress) {
336 final TreeSet<String> hosts = new TreeSet<String>();
337 for (String hp : hostSet) {
340 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
344 * Create a publisher that batches messages. Be sure to close the publisher
345 * to send the last batch and ensure a clean shutdown.
348 * A set of hosts to be used in the URL to MR. Can be
349 * "host:port". Use multiple entries to enable failover.
351 * The topic on which to publish messages.
352 * @param maxBatchSize
353 * The largest set of messages to batch
355 * The maximum age of a message waiting in a batch
357 * use gzip compression
359 * @return a publisher
361 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
362 int maxBatchSize, long maxAgeMs, boolean compress) {
363 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
364 .compress(compress).build();
368 * Create a publisher that batches messages. Be sure to close the publisher
369 * to send the last batch and ensure a clean shutdown.
372 * A host to be used in the URL to MR. Can be "host:port". Use
373 * multiple entries to enable failover.
375 * The topic on which to publish messages.
380 * @param maxBatchSize
381 * The largest set of messages to batch
383 * The maximum age of a message waiting in a batch
385 * use gzip compression
386 * @param protocolFlag
387 * http auth or ueb auth or dme2 method
388 * @param producerFilePath
389 * all properties for publisher
390 * @return MRBatchingPublisher obj
392 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
393 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
394 String producerFilePath) {
395 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
396 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
397 .compress(compress).build();
400 pub.setUsername(username);
401 pub.setPassword(password);
402 pub.setProtocolFlag(protocolFlag);
407 * Create a publisher that batches messages. Be sure to close the publisher
408 * to send the last batch and ensure a clean shutdown
411 * props set all properties for publishing message
412 * @return MRBatchingPublisher obj
413 * @throws FileNotFoundException
415 * @throws IOException
418 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
419 throws FileNotFoundException, IOException {
420 return createInternalBatchingPublisher(props, withResponse);
424 * Create a publisher that batches messages. Be sure to close the publisher
425 * to send the last batch and ensure a clean shutdown
428 * props set all properties for publishing message
429 * @return MRBatchingPublisher obj
430 * @throws FileNotFoundException
432 * @throws IOException
435 public static MRBatchingPublisher createBatchingPublisher(Properties props)
436 throws FileNotFoundException, IOException {
437 return createInternalBatchingPublisher(props, false);
441 * Create a publisher that batches messages. Be sure to close the publisher
442 * to send the last batch and ensure a clean shutdown
444 * @param producerFilePath
445 * set all properties for publishing message
446 * @return MRBatchingPublisher obj
447 * @throws FileNotFoundException
449 * @throws IOException
452 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
453 throws FileNotFoundException, IOException {
454 FileReader reader = new FileReader(new File(producerFilePath));
455 Properties props = new Properties();
457 return createBatchingPublisher(props);
461 * Create a publisher that will contain send methods that return response
464 * @param producerFilePath
465 * set all properties for publishing message
466 * @return MRBatchingPublisher obj
467 * @throws FileNotFoundException
469 * @throws IOException
472 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
473 throws FileNotFoundException, IOException {
474 FileReader reader = new FileReader(new File(producerFilePath));
475 Properties props = new Properties();
477 return createBatchingPublisher(props, withResponse);
480 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
481 throws FileNotFoundException, IOException {
482 assert props != null;
483 MRSimplerBatchPublisher pub;
485 pub = new MRSimplerBatchPublisher.Builder()
486 .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
487 .onTopic(props.getProperty("topic"))
488 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
489 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
490 .compress(Boolean.parseBoolean(props.getProperty("compress")))
491 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
492 .withResponse(withResponse).build();
494 pub = new MRSimplerBatchPublisher.Builder()
495 .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
496 .onTopic(props.getProperty("topic"))
497 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
498 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
499 .compress(Boolean.parseBoolean(props.getProperty("compress")))
500 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
502 pub.setHost(props.getProperty("host"));
503 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
505 pub.setAuthKey(props.getProperty("authKey"));
506 pub.setAuthDate(props.getProperty("authDate"));
507 pub.setUsername(props.getProperty("username"));
508 pub.setPassword(props.getProperty("password"));
510 pub.setUsername(props.getProperty("username"));
511 pub.setPassword(props.getProperty("password"));
513 pub.setProtocolFlag(props.getProperty("TransportType"));
515 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
516 routeReader = new FileReader(new File(routeFilePath));
517 prop = new Properties();
518 File fo = new File(routeFilePath);
520 routeWriter = new FileWriter(new File(routeFilePath));
526 * Create an identity manager client to work with API keys.
529 * A set of hosts to be used in the URL to MR. Can be
530 * "host:port". Use multiple entries to enable failover.
535 * @return an identity manager
537 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
538 MRIdentityManager cim;
540 cim = new MRMetaClient(hostSet);
541 } catch (MalformedURLException e) {
542 throw new IllegalArgumentException(e);
544 cim.setApiCredentials(apiKey, apiSecret);
549 * Create a topic manager for working with topics.
552 * A set of hosts to be used in the URL to MR. Can be
553 * "host:port". Use multiple entries to enable failover.
558 * @return a topic manager
560 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
563 tmi = new MRMetaClient(hostSet);
564 } catch (MalformedURLException e) {
565 throw new RuntimeException(e);
567 tmi.setApiCredentials(apiKey, apiSecret);
572 * Inject a consumer. Used to support unit tests.
576 public static void $testInject(MRConsumer cc) {
577 MRClientBuilders.sfConsumerMock = cc;
580 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
581 String id, int i, int j, String protocalFlag, String consumerFilePath) {
585 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
586 } catch (MalformedURLException e) {
587 throw new IllegalArgumentException(e);
589 sub.setUsername(username);
590 sub.setPassword(password);
592 sub.setProtocolFlag(protocalFlag);
593 sub.setConsumerFilePath(consumerFilePath);
598 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
599 String id, String protocalFlag, String consumerFilePath, int i, int j) {
603 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
604 } catch (MalformedURLException e) {
605 throw new RuntimeException(e);
607 sub.setUsername(username);
608 sub.setPassword(password);
610 sub.setProtocolFlag(protocalFlag);
611 sub.setConsumerFilePath(consumerFilePath);
616 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
617 FileReader reader = new FileReader(new File(consumerFilePath));
618 Properties props = new Properties();
621 return createConsumer(props);
624 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
626 if (props.getProperty("timeout") != null)
627 timeout = Integer.parseInt(props.getProperty("timeout"));
631 if (props.getProperty("limit") != null)
632 limit = Integer.parseInt(props.getProperty("limit"));
636 if (props.getProperty("group") == null)
637 group = UUID.randomUUID().toString();
639 group = props.getProperty("group");
640 MRConsumerImpl sub = null;
641 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
642 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
643 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
644 props.getProperty("authKey"), props.getProperty("authDate"));
645 sub.setAuthKey(props.getProperty("authKey"));
646 sub.setAuthDate(props.getProperty("authDate"));
647 sub.setUsername(props.getProperty("username"));
648 sub.setPassword(props.getProperty("password"));
650 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
651 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
652 props.getProperty("username"), props.getProperty("password"));
653 sub.setUsername(props.getProperty("username"));
654 sub.setPassword(props.getProperty("password"));
656 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
658 sub.setHost(props.getProperty("host"));
659 sub.setProtocolFlag(props.getProperty("TransportType"));
660 sub.setfFilter(props.getProperty("filter"));
661 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
662 routeReader = new FileReader(new File(routeFilePath));
663 prop = new Properties();
664 File fo = new File(routeFilePath);
666 routeWriter = new FileWriter(new File(routeFilePath));