1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
36 import javax.ws.rs.core.MultivaluedMap;
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42 import com.att.nsa.mr.tools.ValidatorUtil;
45 * A factory for MR clients.<br/>
47 * Use caution selecting a consumer creator factory. If the call doesn't accept
48 * a consumer group name, then it creates a consumer that is not restartable.
49 * That is, if you stop your process and start it again, your client will NOT
50 * receive any missed messages on the topic. If you need to ensure receipt of
51 * missed messages, then you must use a consumer that's created with a group
52 * name and ID. (If you create multiple consumer processes using the same group,
53 * load is split across them. Be sure to use a different ID for each
60 public class MRClientFactory {
61 public static MultivaluedMap<String, Object> HTTPHeadersMap;
62 public static Map<String, String> DME2HeadersMap;
63 public static String routeFilePath;
65 public static FileReader routeReader;
67 public static FileWriter routeWriter = null;
68 public static Properties prop = null;
71 // props= new Properties();
73 * Create a consumer instance with the default timeout and no limit on
74 * messages returned. This consumer operates as an independent consumer
75 * (i.e., not in a group) and is NOT re-startable across sessions.
78 * A comma separated list of hosts to use to connect to MR. You
79 * can include port numbers (3904 is the default). For example,
83 * The topic to consume
87 public static MRConsumer createConsumer(String hostList, String topic) {
88 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
92 * Create a consumer instance with the default timeout and no limit on
93 * messages returned. This consumer operates as an independent consumer
94 * (i.e., not in a group) and is NOT re-startable across sessions.
97 * The host used in the URL to MR. Entries can be "host:port".
99 * The topic to consume
103 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
104 return createConsumer(hostSet, topic, null);
108 * Create a consumer instance with server-side filtering, the default
109 * timeout, and no limit on messages returned. This consumer operates as an
110 * independent consumer (i.e., not in a group) and is NOT re-startable
114 * The host used in the URL to MR. Entries can be "host:port".
116 * The topic to consume
118 * a filter to use on the server side
122 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
123 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
127 * Create a consumer instance with the default timeout, and no limit on
128 * messages returned. This consumer can operate in a logical group and is
129 * re-startable across sessions when you use the same group and ID on
133 * The host used in the URL to MR. Entries can be "host:port".
135 * The topic to consume
136 * @param consumerGroup
137 * The name of the consumer group this consumer is part of
139 * The unique id of this consume in its group
143 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
144 final String consumerId) {
145 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
149 * Create a consumer instance with the default timeout, and no limit on
150 * messages returned. This consumer can operate in a logical group and is
151 * re-startable across sessions when you use the same group and ID on
155 * The host used in the URL to MR. Entries can be "host:port".
157 * The topic to consume
158 * @param consumerGroup
159 * The name of the consumer group this consumer is part of
161 * The unique id of this consume in its group
163 * The amount of time in milliseconds that the server should keep
164 * the connection open while waiting for message traffic. Use -1
165 * for default timeout.
167 * A limit on the number of messages returned in a single call.
168 * Use -1 for no limit.
172 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
173 final String consumerId, int timeoutMs, int limit) {
174 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
178 * Create a consumer instance with the default timeout, and no limit on
179 * messages returned. This consumer can operate in a logical group and is
180 * re-startable across sessions when you use the same group and ID on
181 * restart. This consumer also uses server-side filtering.
184 * A comma separated list of hosts to use to connect to MR. You
185 * can include port numbers (3904 is the default). For example,
186 * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
188 * The topic to consume
189 * @param consumerGroup
190 * The name of the consumer group this consumer is part of
192 * The unique id of this consume in its group
194 * The amount of time in milliseconds that the server should keep
195 * the connection open while waiting for message traffic. Use -1
196 * for default timeout.
198 * A limit on the number of messages returned in a single call.
199 * Use -1 for no limit.
201 * A Highland Park filter expression using only built-in filter
202 * components. Use null for "no filter".
206 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
207 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
208 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
209 filter, apiKey, apiSecret);
213 * Create a consumer instance with the default timeout, and no limit on
214 * messages returned. This consumer can operate in a logical group and is
215 * re-startable across sessions when you use the same group and ID on
216 * restart. This consumer also uses server-side filtering.
219 * The host used in the URL to MR. Entries can be "host:port".
221 * The topic to consume
222 * @param consumerGroup
223 * The name of the consumer group this consumer is part of
225 * The unique id of this consume in its group
227 * The amount of time in milliseconds that the server should keep
228 * the connection open while waiting for message traffic. Use -1
229 * for default timeout.
231 * A limit on the number of messages returned in a single call.
232 * Use -1 for no limit.
234 * A Highland Park filter expression using only built-in filter
235 * components. Use null for "no filter".
239 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
240 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
241 if (MRClientBuilders.sfConsumerMock != null)
242 return MRClientBuilders.sfConsumerMock;
244 return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
246 } catch (MalformedURLException e) {
247 throw new IllegalArgumentException(e);
251 /*************************************************************************/
252 /*************************************************************************/
253 /*************************************************************************/
256 * Create a publisher that sends each message (or group of messages)
257 * immediately. Most applications should favor higher latency for much
258 * higher message throughput and the "simple publisher" is not a good
262 * The host used in the URL to MR. Can be "host:port", can be
263 * multiple comma-separated entries.
265 * The topic on which to publish messages.
266 * @return a publisher
268 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
269 return createBatchingPublisher(hostlist, topic, 1, 1);
273 * Create a publisher that batches messages. Be sure to close the publisher
274 * to send the last batch and ensure a clean shutdown. Message payloads are
278 * The host used in the URL to MR. Can be "host:port", can be
279 * multiple comma-separated entries.
281 * The topic on which to publish messages.
282 * @param maxBatchSize
283 * The largest set of messages to batch
285 * The maximum age of a message waiting in a batch
287 * @return a publisher
289 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
291 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
295 * Create a publisher that batches messages. Be sure to close the publisher
296 * to send the last batch and ensure a clean shutdown.
299 * The host used in the URL to MR. Can be "host:port", can be
300 * multiple comma-separated entries.
302 * The topic on which to publish messages.
303 * @param maxBatchSize
304 * The largest set of messages to batch
306 * The maximum age of a message waiting in a batch
308 * use gzip compression
310 * @return a publisher
312 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
313 long maxAgeMs, boolean compress) {
314 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
318 * Create a publisher that batches messages. Be sure to close the publisher
319 * to send the last batch and ensure a clean shutdown.
322 * A set of hosts to be used in the URL to MR. Can be
323 * "host:port". Use multiple entries to enable failover.
325 * The topic on which to publish messages.
326 * @param maxBatchSize
327 * The largest set of messages to batch
329 * The maximum age of a message waiting in a batch
331 * use gzip compression
333 * @return a publisher
335 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
336 long maxAgeMs, boolean compress) {
337 final TreeSet<String> hosts = new TreeSet<String>();
338 for (String hp : hostSet) {
341 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
345 * Create a publisher that batches messages. Be sure to close the publisher
346 * to send the last batch and ensure a clean shutdown.
349 * A set of hosts to be used in the URL to MR. Can be
350 * "host:port". Use multiple entries to enable failover.
352 * The topic on which to publish messages.
353 * @param maxBatchSize
354 * The largest set of messages to batch
356 * The maximum age of a message waiting in a batch
358 * use gzip compression
360 * @return a publisher
362 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
363 int maxBatchSize, long maxAgeMs, boolean compress) {
364 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
365 .compress(compress).build();
369 * Create a publisher that batches messages. Be sure to close the publisher
370 * to send the last batch and ensure a clean shutdown.
373 * A host to be used in the URL to MR. Can be "host:port". Use
374 * multiple entries to enable failover.
376 * The topic on which to publish messages.
381 * @param maxBatchSize
382 * The largest set of messages to batch
384 * The maximum age of a message waiting in a batch
386 * use gzip compression
387 * @param protocolFlag
388 * http auth or ueb auth or dme2 method
389 * @param producerFilePath
390 * all properties for publisher
391 * @return MRBatchingPublisher obj
393 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
394 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
395 String producerFilePath) {
396 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
397 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
398 .compress(compress).build();
401 pub.setUsername(username);
402 pub.setPassword(password);
403 pub.setProtocolFlag(protocolFlag);
408 * Create a publisher that batches messages. Be sure to close the publisher
409 * to send the last batch and ensure a clean shutdown
412 * props set all properties for publishing message
413 * @return MRBatchingPublisher obj
414 * @throws FileNotFoundException
416 * @throws IOException
419 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
420 throws FileNotFoundException, IOException {
421 return createInternalBatchingPublisher(props, withResponse);
425 * Create a publisher that batches messages. Be sure to close the publisher
426 * to send the last batch and ensure a clean shutdown
429 * props set all properties for publishing message
430 * @return MRBatchingPublisher obj
431 * @throws FileNotFoundException
433 * @throws IOException
436 public static MRBatchingPublisher createBatchingPublisher(Properties props)
437 throws FileNotFoundException, IOException {
438 return createInternalBatchingPublisher(props, false);
442 * Create a publisher that batches messages. Be sure to close the publisher
443 * to send the last batch and ensure a clean shutdown
445 * @param producerFilePath
446 * set all properties for publishing message
447 * @return MRBatchingPublisher obj
448 * @throws FileNotFoundException
450 * @throws IOException
453 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
454 throws FileNotFoundException, IOException {
455 FileReader reader = new FileReader(new File(producerFilePath));
456 Properties props = new Properties();
458 return createBatchingPublisher(props);
462 * Create a publisher that will contain send methods that return response
465 * @param producerFilePath
466 * set all properties for publishing message
467 * @return MRBatchingPublisher obj
468 * @throws FileNotFoundException
470 * @throws IOException
473 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
474 throws FileNotFoundException, IOException {
475 FileReader reader = new FileReader(new File(producerFilePath));
476 Properties props = new Properties();
478 return createBatchingPublisher(props, withResponse);
481 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
482 throws FileNotFoundException, IOException {
483 assert props != null;
484 MRSimplerBatchPublisher pub;
486 pub = new MRSimplerBatchPublisher.Builder()
487 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
488 .onTopic(props.getProperty("topic"))
489 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
490 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
491 .compress(Boolean.parseBoolean(props.getProperty("compress")))
492 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
493 .withResponse(withResponse).build();
495 pub = new MRSimplerBatchPublisher.Builder()
496 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
497 .onTopic(props.getProperty("topic"))
498 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
499 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
500 .compress(Boolean.parseBoolean(props.getProperty("compress")))
501 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
503 pub.setHost(props.getProperty("host"));
504 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
506 pub.setAuthKey(props.getProperty("authKey"));
507 pub.setAuthDate(props.getProperty("authDate"));
508 pub.setUsername(props.getProperty("username"));
509 pub.setPassword(props.getProperty("password"));
511 pub.setUsername(props.getProperty("username"));
512 pub.setPassword(props.getProperty("password"));
514 pub.setProtocolFlag(props.getProperty("TransportType"));
516 prop = new Properties();
517 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
518 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
519 routeReader = new FileReader(new File(routeFilePath));
520 File fo = new File(routeFilePath);
522 routeWriter = new FileWriter(new File(routeFilePath));
529 * Create an identity manager client to work with API keys.
532 * A set of hosts to be used in the URL to MR. Can be
533 * "host:port". Use multiple entries to enable failover.
538 * @return an identity manager
540 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
541 MRIdentityManager cim;
543 cim = new MRMetaClient(hostSet);
544 } catch (MalformedURLException e) {
545 throw new IllegalArgumentException(e);
547 cim.setApiCredentials(apiKey, apiSecret);
552 * Create a topic manager for working with topics.
555 * A set of hosts to be used in the URL to MR. Can be
556 * "host:port". Use multiple entries to enable failover.
561 * @return a topic manager
563 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
566 tmi = new MRMetaClient(hostSet);
567 } catch (MalformedURLException e) {
568 throw new IllegalArgumentException(e);
570 tmi.setApiCredentials(apiKey, apiSecret);
575 * Inject a consumer. Used to support unit tests.
579 public static void $testInject(MRConsumer cc) {
580 MRClientBuilders.sfConsumerMock = cc;
583 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
584 String id, int i, int j, String protocalFlag, String consumerFilePath) {
588 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
589 } catch (MalformedURLException e) {
590 throw new IllegalArgumentException(e);
592 sub.setUsername(username);
593 sub.setPassword(password);
595 sub.setProtocolFlag(protocalFlag);
596 sub.setConsumerFilePath(consumerFilePath);
601 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
602 String id, String protocalFlag, String consumerFilePath, int i, int j) {
606 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
607 } catch (MalformedURLException e) {
608 throw new IllegalArgumentException(e);
610 sub.setUsername(username);
611 sub.setPassword(password);
613 sub.setProtocolFlag(protocalFlag);
614 sub.setConsumerFilePath(consumerFilePath);
619 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
620 FileReader reader = new FileReader(new File(consumerFilePath));
621 Properties props = new Properties();
624 return createConsumer(props);
627 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
629 ValidatorUtil.validateSubscriber(props);
630 if (props.getProperty("timeout") != null)
631 timeout = Integer.parseInt(props.getProperty("timeout"));
635 if (props.getProperty("limit") != null)
636 limit = Integer.parseInt(props.getProperty("limit"));
640 if (props.getProperty("group") == null)
641 group = UUID.randomUUID().toString();
643 group = props.getProperty("group");
644 MRConsumerImpl sub = null;
645 if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
646 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
647 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
648 props.getProperty("authKey"), props.getProperty("authDate"));
649 sub.setAuthKey(props.getProperty("authKey"));
650 sub.setAuthDate(props.getProperty("authDate"));
651 sub.setUsername(props.getProperty("username"));
652 sub.setPassword(props.getProperty("password"));
654 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
655 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
656 props.getProperty("username"), props.getProperty("password"));
657 sub.setUsername(props.getProperty("username"));
658 sub.setPassword(props.getProperty("password"));
660 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
662 sub.setHost(props.getProperty("host"));
663 sub.setProtocolFlag(props.getProperty("TransportType"));
664 sub.setfFilter(props.getProperty("filter"));
665 routeFilePath = props.getProperty("DME2preferredRouterFilePath");
666 routeReader = new FileReader(new File(routeFilePath));
667 prop = new Properties();
668 File fo = new File(routeFilePath);
670 routeWriter = new FileWriter(new File(routeFilePath));