1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
24 package org.onap.dmaap.mr.client;
27 import java.net.MalformedURLException;
28 import java.util.Collection;
30 import java.util.Properties;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import javax.ws.rs.core.MultivaluedMap;
34 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
35 import org.onap.dmaap.mr.client.impl.MRMetaClient;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
37 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
38 import org.onap.dmaap.mr.tools.ValidatorUtil;
41 * A factory for MR clients.<br/>
43 * Use caution selecting a consumer creator factory. If the call doesn't accept
44 * a consumer group name, then it creates a consumer that is not restartable.
45 * That is, if you stop your process and start it again, your client will NOT
46 * receive any missed messages on the topic. If you need to ensure receipt of
47 * missed messages, then you must use a consumer that's created with a group
48 * name and ID. (If you create multiple consumer processes using the same group,
49 * load is split across them. Be sure to use a different ID for each
56 public class MRClientFactory {
57 private static final String AUTH_KEY = "authKey";
58 private static final String AUTH_DATE = "authDate";
59 private static final String PASSWORD = "password";
60 private static final String USERNAME = "username";
61 private static final String FILTER = "filter";
62 private static final String HOST = "host";
63 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
64 private static final String TOPIC = "topic";
65 private static final String TRANSPORT_TYPE = "TransportType";
67 private static MultivaluedMap<String, Object> httpHeadersMap;
68 public static Map<String, String> DME2HeadersMap;
69 public static String routeFilePath;
71 public static FileReader routeReader;
73 public static FileWriter routeWriter = null;
74 public static Properties prop = null;
77 * Instantiates MRClientFactory.
79 private MRClientFactory() {
80 //prevents instantiation.
84 * Add getter to avoid direct access to static header map.
87 public static MultivaluedMap<String, Object> getHTTPHeadersMap() {
88 return httpHeadersMap;
92 * Add setter to avoid direct access to static header map.
95 public static void setHTTPHeadersMap(MultivaluedMap<String, Object> headers) {
96 httpHeadersMap = headers;
100 * Create a consumer instance with the default timeout and no limit on
101 * messages returned. This consumer operates as an independent consumer
102 * (i.e., not in a group) and is NOT re-startable across sessions.
105 * A comma separated list of hosts to use to connect to MR. You
106 * can include port numbers (3904 is the default). For example,
110 * The topic to consume
114 public static MRConsumer createConsumer(String hostList, String topic) {
115 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
119 * Create a consumer instance with the default timeout and no limit on
120 * messages returned. This consumer operates as an independent consumer
121 * (i.e., not in a group) and is NOT re-startable across sessions.
124 * The host used in the URL to MR. Entries can be "host:port".
126 * The topic to consume
130 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
131 return createConsumer(hostSet, topic, null);
135 * Create a consumer instance with server-side filtering, the default
136 * timeout, and no limit on messages returned. This consumer operates as an
137 * independent consumer (i.e., not in a group) and is NOT re-startable
141 * The host used in the URL to MR. Entries can be "host:port".
143 * The topic to consume
145 * a filter to use on the server side
149 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
150 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
154 * Create a consumer instance with the default timeout, and no limit on
155 * messages returned. This consumer can operate in a logical group and is
156 * re-startable across sessions when you use the same group and ID on
160 * The host used in the URL to MR. Entries can be "host:port".
162 * The topic to consume
163 * @param consumerGroup
164 * The name of the consumer group this consumer is part of
166 * The unique id of this consume in its group
170 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
171 final String consumerId) {
172 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
176 * Create a consumer instance with the default timeout, and no limit on
177 * messages returned. This consumer can operate in a logical group and is
178 * re-startable across sessions when you use the same group and ID on
182 * The host used in the URL to MR. Entries can be "host:port".
184 * The topic to consume
185 * @param consumerGroup
186 * The name of the consumer group this consumer is part of
188 * The unique id of this consume in its group
190 * The amount of time in milliseconds that the server should keep
191 * the connection open while waiting for message traffic. Use -1
192 * for default timeout.
194 * A limit on the number of messages returned in a single call.
195 * Use -1 for no limit.
199 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
200 final String consumerId, int timeoutMs, int limit) {
201 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
205 * Create a consumer instance with the default timeout, and no limit on
206 * messages returned. This consumer can operate in a logical group and is
207 * re-startable across sessions when you use the same group and ID on
208 * restart. This consumer also uses server-side filtering.
211 * A comma separated list of hosts to use to connect to MR. You
212 * can include port numbers (3904 is the default)"
214 * The topic to consume
215 * @param consumerGroup
216 * The name of the consumer group this consumer is part of
218 * The unique id of this consume in its group
220 * The amount of time in milliseconds that the server should keep
221 * the connection open while waiting for message traffic. Use -1
222 * for default timeout.
224 * A limit on the number of messages returned in a single call.
225 * Use -1 for no limit.
227 * A Highland Park filter expression using only built-in filter
228 * components. Use null for "no filter".
232 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
233 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
234 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
235 filter, apiKey, apiSecret);
239 * Create a consumer instance with the default timeout, and no limit on
240 * messages returned. This consumer can operate in a logical group and is
241 * re-startable across sessions when you use the same group and ID on
242 * restart. This consumer also uses server-side filtering.
245 * The host used in the URL to MR. Entries can be "host:port".
247 * The topic to consume
248 * @param consumerGroup
249 * The name of the consumer group this consumer is part of
251 * The unique id of this consume in its group
253 * The amount of time in milliseconds that the server should keep
254 * the connection open while waiting for message traffic. Use -1
255 * for default timeout.
257 * A limit on the number of messages returned in a single call.
258 * Use -1 for no limit.
260 * A Highland Park filter expression using only built-in filter
261 * components. Use null for "no filter".
265 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
266 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
267 if (MRClientBuilders.sfConsumerMock != null)
268 return MRClientBuilders.sfConsumerMock;
270 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
271 .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
272 .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
273 .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
274 .createMRConsumerImpl();
275 } catch (MalformedURLException e) {
276 throw new IllegalArgumentException(e);
280 /*************************************************************************/
281 /*************************************************************************/
282 /*************************************************************************/
285 * Create a publisher that sends each message (or group of messages)
286 * immediately. Most applications should favor higher latency for much
287 * higher message throughput and the "simple publisher" is not a good
291 * The host used in the URL to MR. Can be "host:port", can be
292 * multiple comma-separated entries.
294 * The topic on which to publish messages.
295 * @return a publisher
297 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
298 return createBatchingPublisher(hostlist, topic, 1, 1);
302 * Create a publisher that batches messages. Be sure to close the publisher
303 * to send the last batch and ensure a clean shutdown. Message payloads are
307 * The host used in the URL to MR. Can be "host:port", can be
308 * multiple comma-separated entries.
310 * The topic on which to publish messages.
311 * @param maxBatchSize
312 * The largest set of messages to batch
314 * The maximum age of a message waiting in a batch
316 * @return a publisher
318 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
320 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
324 * Create a publisher that batches messages. Be sure to close the publisher
325 * to send the last batch and ensure a clean shutdown.
328 * The host used in the URL to MR. Can be "host:port", can be
329 * multiple comma-separated entries.
331 * The topic on which to publish messages.
332 * @param maxBatchSize
333 * The largest set of messages to batch
335 * The maximum age of a message waiting in a batch
337 * use gzip compression
339 * @return a publisher
341 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
342 long maxAgeMs, boolean compress) {
343 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
347 * Create a publisher that batches messages. Be sure to close the publisher
348 * to send the last batch and ensure a clean shutdown.
351 * A set of hosts to be used in the URL to MR. Can be
352 * "host:port". Use multiple entries to enable failover.
354 * The topic on which to publish messages.
355 * @param maxBatchSize
356 * The largest set of messages to batch
358 * The maximum age of a message waiting in a batch
360 * use gzip compression
362 * @return a publisher
364 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
365 long maxAgeMs, boolean compress) {
366 final TreeSet<String> hosts = new TreeSet<>();
367 for (String hp : hostSet) {
370 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
374 * Create a publisher that batches messages. Be sure to close the publisher
375 * to send the last batch and ensure a clean shutdown.
378 * A set of hosts to be used in the URL to MR. Can be
379 * "host:port". Use multiple entries to enable failover.
381 * The topic on which to publish messages.
382 * @param maxBatchSize
383 * The largest set of messages to batch
385 * The maximum age of a message waiting in a batch
387 * use gzip compression
389 * @return a publisher
391 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
392 int maxBatchSize, long maxAgeMs, boolean compress) {
393 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
394 .compress(compress).build();
398 * Create a publisher that batches messages. Be sure to close the publisher
399 * to send the last batch and ensure a clean shutdown.
402 * A host to be used in the URL to MR. Can be "host:port". Use
403 * multiple entries to enable failover.
405 * The topic on which to publish messages.
410 * @param maxBatchSize
411 * The largest set of messages to batch
413 * The maximum age of a message waiting in a batch
415 * use gzip compression
416 * @param protocolFlag
417 * http auth or ueb auth or dme2 method
418 * @return MRBatchingPublisher obj
420 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
421 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
422 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
423 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
424 .compress(compress).build();
427 pub.setUsername(username);
428 pub.setPassword(password);
429 pub.setProtocolFlag(protocolFlag);
434 * Create a publisher that batches messages. Be sure to close the publisher
435 * to send the last batch and ensure a clean shutdown
438 * props set all properties for publishing message
439 * @return MRBatchingPublisher obj
440 * @throws FileNotFoundException
442 * @throws IOException
445 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
446 throws FileNotFoundException, IOException {
447 return createInternalBatchingPublisher(props, withResponse);
451 * Create a publisher that batches messages. Be sure to close the publisher
452 * to send the last batch and ensure a clean shutdown
455 * props set all properties for publishing message
456 * @return MRBatchingPublisher obj
457 * @throws FileNotFoundException
459 * @throws IOException
462 public static MRBatchingPublisher createBatchingPublisher(Properties props)
463 throws FileNotFoundException, IOException {
464 return createInternalBatchingPublisher(props, false);
468 * Create a publisher that batches messages. Be sure to close the publisher
469 * to send the last batch and ensure a clean shutdown
471 * @param producerFilePath
472 * set all properties for publishing message
473 * @return MRBatchingPublisher obj
474 * @throws FileNotFoundException
476 * @throws IOException
479 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
480 throws FileNotFoundException, IOException {
481 Properties props = new Properties();
482 try(InputStream input = new FileInputStream(producerFilePath)) {
485 return createBatchingPublisher(props);
489 * Create a publisher that will contain send methods that return response
492 * @param producerFilePath
493 * set all properties for publishing message
494 * @return MRBatchingPublisher obj
495 * @throws FileNotFoundException
497 * @throws IOException
500 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
501 throws FileNotFoundException, IOException {
502 Properties props = new Properties();
503 try(InputStream input = new FileInputStream(producerFilePath)) {
506 return createBatchingPublisher(props, withResponse);
509 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
510 throws FileNotFoundException, IOException {
511 assert props != null;
512 MRSimplerBatchPublisher pub;
514 pub = new MRSimplerBatchPublisher.Builder()
515 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
516 .onTopic(props.getProperty(TOPIC))
517 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
518 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
519 .compress(Boolean.parseBoolean(props.getProperty("compress")))
520 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
521 .withResponse(withResponse).build();
523 pub = new MRSimplerBatchPublisher.Builder()
524 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty(HOST)), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
525 .onTopic(props.getProperty(TOPIC))
526 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
527 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
528 .compress(Boolean.parseBoolean(props.getProperty("compress")))
529 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
531 pub.setHost(props.getProperty(HOST));
532 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
534 pub.setAuthKey(props.getProperty(AUTH_KEY));
535 pub.setAuthDate(props.getProperty(AUTH_DATE));
536 pub.setUsername(props.getProperty(USERNAME));
537 pub.setPassword(props.getProperty(PASSWORD));
539 pub.setUsername(props.getProperty(USERNAME));
540 pub.setPassword(props.getProperty(PASSWORD));
542 pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
544 prop = new Properties();
545 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
546 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
547 routeReader = new FileReader(new File(routeFilePath));
548 File fo = new File(routeFilePath);
550 routeWriter = new FileWriter(new File(routeFilePath));
557 * Create an identity manager client to work with API keys.
560 * A set of hosts to be used in the URL to MR. Can be
561 * "host:port". Use multiple entries to enable failover.
566 * @return an identity manager
568 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
569 MRIdentityManager cim;
571 cim = new MRMetaClient(hostSet);
572 } catch (MalformedURLException e) {
573 throw new IllegalArgumentException(e);
575 cim.setApiCredentials(apiKey, apiSecret);
580 * Create a topic manager for working with topics.
583 * A set of hosts to be used in the URL to MR. Can be
584 * "host:port". Use multiple entries to enable failover.
589 * @return a topic manager
591 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
594 tmi = new MRMetaClient(hostSet);
595 } catch (MalformedURLException e) {
596 throw new IllegalArgumentException(e);
598 tmi.setApiCredentials(apiKey, apiSecret);
603 * Inject a consumer. Used to support unit tests.
607 public static void $testInject(MRConsumer cc) {
608 MRClientBuilders.sfConsumerMock = cc;
611 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
612 String id, int i, int j, String protocalFlag, String consumerFilePath) {
616 sub = new MRConsumerImpl.MRConsumerImplBuilder()
617 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
618 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
619 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
620 .createMRConsumerImpl();
621 } catch (MalformedURLException e) {
622 throw new IllegalArgumentException(e);
624 sub.setUsername(username);
625 sub.setPassword(password);
627 sub.setProtocolFlag(protocalFlag);
628 sub.setConsumerFilePath(consumerFilePath);
633 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
634 String id, String protocalFlag, String consumerFilePath, int i, int j) {
638 sub = new MRConsumerImpl.MRConsumerImplBuilder()
639 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
640 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
641 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
642 .createMRConsumerImpl();
643 } catch (MalformedURLException e) {
644 throw new IllegalArgumentException(e);
646 sub.setUsername(username);
647 sub.setPassword(password);
649 sub.setProtocolFlag(protocalFlag);
650 sub.setConsumerFilePath(consumerFilePath);
655 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
656 Properties props = new Properties();
657 try(InputStream input = new FileInputStream(consumerFilePath)) {
660 return createConsumer(props);
663 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
665 ValidatorUtil.validateSubscriber(props);
666 if (props.getProperty("timeout") != null)
667 timeout = Integer.parseInt(props.getProperty("timeout"));
671 if (props.getProperty("limit") != null)
672 limit = Integer.parseInt(props.getProperty("limit"));
676 if (props.getProperty("group") == null)
677 group = UUID.randomUUID().toString();
679 group = props.getProperty("group");
680 MRConsumerImpl sub = null;
681 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
682 sub = new MRConsumerImpl.MRConsumerImplBuilder()
683 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
684 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
685 .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
686 .setFilter(props.getProperty(FILTER))
687 .setApiKey_username(props.getProperty(AUTH_KEY))
688 .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
689 sub.setAuthKey(props.getProperty(AUTH_KEY));
690 sub.setAuthDate(props.getProperty(AUTH_DATE));
691 sub.setUsername(props.getProperty(USERNAME));
692 sub.setPassword(props.getProperty(PASSWORD));
694 sub = new MRConsumerImpl.MRConsumerImplBuilder()
695 .setHostPart(MRConsumerImpl.stringToList(props.getProperty(HOST)))
696 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
697 .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
698 .setFilter(props.getProperty(FILTER))
699 .setApiKey_username(props.getProperty(USERNAME))
700 .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
701 sub.setUsername(props.getProperty(USERNAME));
702 sub.setPassword(props.getProperty(PASSWORD));
706 sub.setHost(props.getProperty(HOST));
707 sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
708 sub.setfFilter(props.getProperty(FILTER));
709 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
710 MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
711 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
712 routeReader = new FileReader(new File(routeFilePath));
713 prop = new Properties();
714 File fo = new File(routeFilePath);
716 routeWriter = new FileWriter(new File(routeFilePath));