1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
24 package org.onap.dmaap.mr.client;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
38 import javax.ws.rs.core.MultivaluedMap;
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
41 import org.onap.dmaap.mr.client.impl.MRMetaClient;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
44 import org.onap.dmaap.mr.tools.ValidatorUtil;
47 * A factory for MR clients.<br/>
49 * Use caution selecting a consumer creator factory. If the call doesn't accept
50 * a consumer group name, then it creates a consumer that is not restartable.
51 * That is, if you stop your process and start it again, your client will NOT
52 * receive any missed messages on the topic. If you need to ensure receipt of
53 * missed messages, then you must use a consumer that's created with a group
54 * name and ID. (If you create multiple consumer processes using the same group,
55 * load is split across them. Be sure to use a different ID for each
62 public class MRClientFactory {
63 private static final String AUTH_KEY = "authKey";
64 private static final String AUTH_DATE = "authDate";
65 private static final String PASSWORD = "password";
66 private static final String USERNAME = "username";
67 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
68 private static final String TOPIC = "topic";
69 private static final String TRANSPORT_TYPE = "TransportType";
70 public static MultivaluedMap<String, Object> HTTPHeadersMap;
71 public static Map<String, String> DME2HeadersMap;
72 public static String routeFilePath;
74 public static FileReader routeReader;
76 public static FileWriter routeWriter = null;
77 public static Properties prop = null;
80 * Instantiates MRClientFactory.
82 private MRClientFactory() {
83 //prevents instantiation.
87 * Create a consumer instance with the default timeout and no limit on
88 * messages returned. This consumer operates as an independent consumer
89 * (i.e., not in a group) and is NOT re-startable across sessions.
92 * A comma separated list of hosts to use to connect to MR. You
93 * can include port numbers (3904 is the default). For example,
97 * The topic to consume
101 public static MRConsumer createConsumer(String hostList, String topic) {
102 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
106 * Create a consumer instance with the default timeout and no limit on
107 * messages returned. This consumer operates as an independent consumer
108 * (i.e., not in a group) and is NOT re-startable across sessions.
111 * The host used in the URL to MR. Entries can be "host:port".
113 * The topic to consume
117 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
118 return createConsumer(hostSet, topic, null);
122 * Create a consumer instance with server-side filtering, the default
123 * timeout, and no limit on messages returned. This consumer operates as an
124 * independent consumer (i.e., not in a group) and is NOT re-startable
128 * The host used in the URL to MR. Entries can be "host:port".
130 * The topic to consume
132 * a filter to use on the server side
136 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
137 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
141 * Create a consumer instance with the default timeout, and no limit on
142 * messages returned. This consumer can operate in a logical group and is
143 * re-startable across sessions when you use the same group and ID on
147 * The host used in the URL to MR. Entries can be "host:port".
149 * The topic to consume
150 * @param consumerGroup
151 * The name of the consumer group this consumer is part of
153 * The unique id of this consume in its group
157 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
158 final String consumerId) {
159 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
163 * Create a consumer instance with the default timeout, and no limit on
164 * messages returned. This consumer can operate in a logical group and is
165 * re-startable across sessions when you use the same group and ID on
169 * The host used in the URL to MR. Entries can be "host:port".
171 * The topic to consume
172 * @param consumerGroup
173 * The name of the consumer group this consumer is part of
175 * The unique id of this consume in its group
177 * The amount of time in milliseconds that the server should keep
178 * the connection open while waiting for message traffic. Use -1
179 * for default timeout.
181 * A limit on the number of messages returned in a single call.
182 * Use -1 for no limit.
186 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
187 final String consumerId, int timeoutMs, int limit) {
188 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
192 * Create a consumer instance with the default timeout, and no limit on
193 * messages returned. This consumer can operate in a logical group and is
194 * re-startable across sessions when you use the same group and ID on
195 * restart. This consumer also uses server-side filtering.
198 * A comma separated list of hosts to use to connect to MR. You
199 * can include port numbers (3904 is the default). For example,
200 * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
202 * The topic to consume
203 * @param consumerGroup
204 * The name of the consumer group this consumer is part of
206 * The unique id of this consume in its group
208 * The amount of time in milliseconds that the server should keep
209 * the connection open while waiting for message traffic. Use -1
210 * for default timeout.
212 * A limit on the number of messages returned in a single call.
213 * Use -1 for no limit.
215 * A Highland Park filter expression using only built-in filter
216 * components. Use null for "no filter".
220 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
221 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
222 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
223 filter, apiKey, apiSecret);
227 * Create a consumer instance with the default timeout, and no limit on
228 * messages returned. This consumer can operate in a logical group and is
229 * re-startable across sessions when you use the same group and ID on
230 * restart. This consumer also uses server-side filtering.
233 * The host used in the URL to MR. Entries can be "host:port".
235 * The topic to consume
236 * @param consumerGroup
237 * The name of the consumer group this consumer is part of
239 * The unique id of this consume in its group
241 * The amount of time in milliseconds that the server should keep
242 * the connection open while waiting for message traffic. Use -1
243 * for default timeout.
245 * A limit on the number of messages returned in a single call.
246 * Use -1 for no limit.
248 * A Highland Park filter expression using only built-in filter
249 * components. Use null for "no filter".
253 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
254 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
255 if (MRClientBuilders.sfConsumerMock != null)
256 return MRClientBuilders.sfConsumerMock;
258 return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
260 } catch (MalformedURLException e) {
261 throw new IllegalArgumentException(e);
265 /*************************************************************************/
266 /*************************************************************************/
267 /*************************************************************************/
270 * Create a publisher that sends each message (or group of messages)
271 * immediately. Most applications should favor higher latency for much
272 * higher message throughput and the "simple publisher" is not a good
276 * The host used in the URL to MR. Can be "host:port", can be
277 * multiple comma-separated entries.
279 * The topic on which to publish messages.
280 * @return a publisher
282 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
283 return createBatchingPublisher(hostlist, topic, 1, 1);
287 * Create a publisher that batches messages. Be sure to close the publisher
288 * to send the last batch and ensure a clean shutdown. Message payloads are
292 * The host used in the URL to MR. Can be "host:port", can be
293 * multiple comma-separated entries.
295 * The topic on which to publish messages.
296 * @param maxBatchSize
297 * The largest set of messages to batch
299 * The maximum age of a message waiting in a batch
301 * @return a publisher
303 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
305 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
309 * Create a publisher that batches messages. Be sure to close the publisher
310 * to send the last batch and ensure a clean shutdown.
313 * The host used in the URL to MR. Can be "host:port", can be
314 * multiple comma-separated entries.
316 * The topic on which to publish messages.
317 * @param maxBatchSize
318 * The largest set of messages to batch
320 * The maximum age of a message waiting in a batch
322 * use gzip compression
324 * @return a publisher
326 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
327 long maxAgeMs, boolean compress) {
328 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
332 * Create a publisher that batches messages. Be sure to close the publisher
333 * to send the last batch and ensure a clean shutdown.
336 * A set of hosts to be used in the URL to MR. Can be
337 * "host:port". Use multiple entries to enable failover.
339 * The topic on which to publish messages.
340 * @param maxBatchSize
341 * The largest set of messages to batch
343 * The maximum age of a message waiting in a batch
345 * use gzip compression
347 * @return a publisher
349 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
350 long maxAgeMs, boolean compress) {
351 final TreeSet<String> hosts = new TreeSet<>();
352 for (String hp : hostSet) {
355 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
359 * Create a publisher that batches messages. Be sure to close the publisher
360 * to send the last batch and ensure a clean shutdown.
363 * A set of hosts to be used in the URL to MR. Can be
364 * "host:port". Use multiple entries to enable failover.
366 * The topic on which to publish messages.
367 * @param maxBatchSize
368 * The largest set of messages to batch
370 * The maximum age of a message waiting in a batch
372 * use gzip compression
374 * @return a publisher
376 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
377 int maxBatchSize, long maxAgeMs, boolean compress) {
378 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
379 .compress(compress).build();
383 * Create a publisher that batches messages. Be sure to close the publisher
384 * to send the last batch and ensure a clean shutdown.
387 * A host to be used in the URL to MR. Can be "host:port". Use
388 * multiple entries to enable failover.
390 * The topic on which to publish messages.
395 * @param maxBatchSize
396 * The largest set of messages to batch
398 * The maximum age of a message waiting in a batch
400 * use gzip compression
401 * @param protocolFlag
402 * http auth or ueb auth or dme2 method
403 * @param producerFilePath
404 * all properties for publisher
405 * @return MRBatchingPublisher obj
407 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
408 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
409 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
410 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
411 .compress(compress).build();
414 pub.setUsername(username);
415 pub.setPassword(password);
416 pub.setProtocolFlag(protocolFlag);
421 * Create a publisher that batches messages. Be sure to close the publisher
422 * to send the last batch and ensure a clean shutdown
425 * props set all properties for publishing message
426 * @return MRBatchingPublisher obj
427 * @throws FileNotFoundException
429 * @throws IOException
432 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
433 throws FileNotFoundException, IOException {
434 return createInternalBatchingPublisher(props, withResponse);
438 * Create a publisher that batches messages. Be sure to close the publisher
439 * to send the last batch and ensure a clean shutdown
442 * props set all properties for publishing message
443 * @return MRBatchingPublisher obj
444 * @throws FileNotFoundException
446 * @throws IOException
449 public static MRBatchingPublisher createBatchingPublisher(Properties props)
450 throws FileNotFoundException, IOException {
451 return createInternalBatchingPublisher(props, false);
455 * Create a publisher that batches messages. Be sure to close the publisher
456 * to send the last batch and ensure a clean shutdown
458 * @param producerFilePath
459 * set all properties for publishing message
460 * @return MRBatchingPublisher obj
461 * @throws FileNotFoundException
463 * @throws IOException
466 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
467 throws FileNotFoundException, IOException {
468 FileReader reader = new FileReader(new File(producerFilePath));
469 Properties props = new Properties();
471 return createBatchingPublisher(props);
475 * Create a publisher that will contain send methods that return response
478 * @param producerFilePath
479 * set all properties for publishing message
480 * @return MRBatchingPublisher obj
481 * @throws FileNotFoundException
483 * @throws IOException
486 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
487 throws FileNotFoundException, IOException {
488 FileReader reader = new FileReader(new File(producerFilePath));
489 Properties props = new Properties();
491 return createBatchingPublisher(props, withResponse);
494 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
495 throws FileNotFoundException, IOException {
496 assert props != null;
497 MRSimplerBatchPublisher pub;
499 pub = new MRSimplerBatchPublisher.Builder()
500 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
501 .onTopic(props.getProperty(TOPIC))
502 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
503 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
504 .compress(Boolean.parseBoolean(props.getProperty("compress")))
505 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
506 .withResponse(withResponse).build();
508 pub = new MRSimplerBatchPublisher.Builder()
509 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
510 .onTopic(props.getProperty(TOPIC))
511 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
512 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
513 .compress(Boolean.parseBoolean(props.getProperty("compress")))
514 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
516 pub.setHost(props.getProperty("host"));
517 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
519 pub.setAuthKey(props.getProperty(AUTH_KEY));
520 pub.setAuthDate(props.getProperty(AUTH_DATE));
521 pub.setUsername(props.getProperty(USERNAME));
522 pub.setPassword(props.getProperty(PASSWORD));
524 pub.setUsername(props.getProperty(USERNAME));
525 pub.setPassword(props.getProperty(PASSWORD));
527 pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
529 prop = new Properties();
530 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
531 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
532 routeReader = new FileReader(new File(routeFilePath));
533 File fo = new File(routeFilePath);
535 routeWriter = new FileWriter(new File(routeFilePath));
542 * Create an identity manager client to work with API keys.
545 * A set of hosts to be used in the URL to MR. Can be
546 * "host:port". Use multiple entries to enable failover.
551 * @return an identity manager
553 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
554 MRIdentityManager cim;
556 cim = new MRMetaClient(hostSet);
557 } catch (MalformedURLException e) {
558 throw new IllegalArgumentException(e);
560 cim.setApiCredentials(apiKey, apiSecret);
565 * Create a topic manager for working with topics.
568 * A set of hosts to be used in the URL to MR. Can be
569 * "host:port". Use multiple entries to enable failover.
574 * @return a topic manager
576 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
579 tmi = new MRMetaClient(hostSet);
580 } catch (MalformedURLException e) {
581 throw new IllegalArgumentException(e);
583 tmi.setApiCredentials(apiKey, apiSecret);
588 * Inject a consumer. Used to support unit tests.
592 public static void $testInject(MRConsumer cc) {
593 MRClientBuilders.sfConsumerMock = cc;
596 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
597 String id, int i, int j, String protocalFlag, String consumerFilePath) {
601 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
602 } catch (MalformedURLException e) {
603 throw new IllegalArgumentException(e);
605 sub.setUsername(username);
606 sub.setPassword(password);
608 sub.setProtocolFlag(protocalFlag);
609 sub.setConsumerFilePath(consumerFilePath);
614 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
615 String id, String protocalFlag, String consumerFilePath, int i, int j) {
619 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
620 } catch (MalformedURLException e) {
621 throw new IllegalArgumentException(e);
623 sub.setUsername(username);
624 sub.setPassword(password);
626 sub.setProtocolFlag(protocalFlag);
627 sub.setConsumerFilePath(consumerFilePath);
632 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
633 FileReader reader = new FileReader(new File(consumerFilePath));
634 Properties props = new Properties();
637 return createConsumer(props);
640 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
642 ValidatorUtil.validateSubscriber(props);
643 if (props.getProperty("timeout") != null)
644 timeout = Integer.parseInt(props.getProperty("timeout"));
648 if (props.getProperty("limit") != null)
649 limit = Integer.parseInt(props.getProperty("limit"));
653 if (props.getProperty("group") == null)
654 group = UUID.randomUUID().toString();
656 group = props.getProperty("group");
657 MRConsumerImpl sub = null;
658 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
659 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
660 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
661 props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
662 sub.setAuthKey(props.getProperty(AUTH_KEY));
663 sub.setAuthDate(props.getProperty(AUTH_DATE));
664 sub.setUsername(props.getProperty(USERNAME));
665 sub.setPassword(props.getProperty(PASSWORD));
667 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
668 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
669 props.getProperty(USERNAME), props.getProperty(PASSWORD));
670 sub.setUsername(props.getProperty(USERNAME));
671 sub.setPassword(props.getProperty(PASSWORD));
675 sub.setHost(props.getProperty("host"));
676 sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
677 sub.setfFilter(props.getProperty("filter"));
678 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
679 MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
680 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
681 routeReader = new FileReader(new File(routeFilePath));
682 prop = new Properties();
683 File fo = new File(routeFilePath);
685 routeWriter = new FileWriter(new File(routeFilePath));