1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
24 package org.onap.dmaap.mr.client;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
38 import javax.ws.rs.core.MultivaluedMap;
40 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
41 import org.onap.dmaap.mr.client.impl.MRMetaClient;
42 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
43 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
44 import org.onap.dmaap.mr.tools.ValidatorUtil;
47 * A factory for MR clients.<br/>
49 * Use caution selecting a consumer creator factory. If the call doesn't accept
50 * a consumer group name, then it creates a consumer that is not restartable.
51 * That is, if you stop your process and start it again, your client will NOT
52 * receive any missed messages on the topic. If you need to ensure receipt of
53 * missed messages, then you must use a consumer that's created with a group
54 * name and ID. (If you create multiple consumer processes using the same group,
55 * load is split across them. Be sure to use a different ID for each
62 public class MRClientFactory {
63 private static final String AUTH_KEY = "authKey";
64 private static final String AUTH_DATE = "authDate";
65 private static final String PASSWORD = "password";
66 private static final String USERNAME = "username";
67 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
68 private static final String TOPIC = "topic";
69 private static final String TRANSPORT_TYPE = "TransportType";
70 public static MultivaluedMap<String, Object> HTTPHeadersMap;
71 public static Map<String, String> DME2HeadersMap;
72 public static String routeFilePath;
74 public static FileReader routeReader;
76 public static FileWriter routeWriter = null;
77 public static Properties prop = null;
80 * Instantiates MRClientFactory.
82 private MRClientFactory() {
83 //prevents instantiation.
87 * Create a consumer instance with the default timeout and no limit on
88 * messages returned. This consumer operates as an independent consumer
89 * (i.e., not in a group) and is NOT re-startable across sessions.
92 * A comma separated list of hosts to use to connect to MR. You
93 * can include port numbers (3904 is the default). For example,
97 * The topic to consume
101 public static MRConsumer createConsumer(String hostList, String topic) {
102 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
106 * Create a consumer instance with the default timeout and no limit on
107 * messages returned. This consumer operates as an independent consumer
108 * (i.e., not in a group) and is NOT re-startable across sessions.
111 * The host used in the URL to MR. Entries can be "host:port".
113 * The topic to consume
117 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
118 return createConsumer(hostSet, topic, null);
122 * Create a consumer instance with server-side filtering, the default
123 * timeout, and no limit on messages returned. This consumer operates as an
124 * independent consumer (i.e., not in a group) and is NOT re-startable
128 * The host used in the URL to MR. Entries can be "host:port".
130 * The topic to consume
132 * a filter to use on the server side
136 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
137 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
141 * Create a consumer instance with the default timeout, and no limit on
142 * messages returned. This consumer can operate in a logical group and is
143 * re-startable across sessions when you use the same group and ID on
147 * The host used in the URL to MR. Entries can be "host:port".
149 * The topic to consume
150 * @param consumerGroup
151 * The name of the consumer group this consumer is part of
153 * The unique id of this consume in its group
157 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
158 final String consumerId) {
159 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
163 * Create a consumer instance with the default timeout, and no limit on
164 * messages returned. This consumer can operate in a logical group and is
165 * re-startable across sessions when you use the same group and ID on
169 * The host used in the URL to MR. Entries can be "host:port".
171 * The topic to consume
172 * @param consumerGroup
173 * The name of the consumer group this consumer is part of
175 * The unique id of this consume in its group
177 * The amount of time in milliseconds that the server should keep
178 * the connection open while waiting for message traffic. Use -1
179 * for default timeout.
181 * A limit on the number of messages returned in a single call.
182 * Use -1 for no limit.
186 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
187 final String consumerId, int timeoutMs, int limit) {
188 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
192 * Create a consumer instance with the default timeout, and no limit on
193 * messages returned. This consumer can operate in a logical group and is
194 * re-startable across sessions when you use the same group and ID on
195 * restart. This consumer also uses server-side filtering.
198 * A comma separated list of hosts to use to connect to MR. You
199 * can include port numbers (3904 is the default)"
201 * The topic to consume
202 * @param consumerGroup
203 * The name of the consumer group this consumer is part of
205 * The unique id of this consume in its group
207 * The amount of time in milliseconds that the server should keep
208 * the connection open while waiting for message traffic. Use -1
209 * for default timeout.
211 * A limit on the number of messages returned in a single call.
212 * Use -1 for no limit.
214 * A Highland Park filter expression using only built-in filter
215 * components. Use null for "no filter".
219 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
220 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
221 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
222 filter, apiKey, apiSecret);
226 * Create a consumer instance with the default timeout, and no limit on
227 * messages returned. This consumer can operate in a logical group and is
228 * re-startable across sessions when you use the same group and ID on
229 * restart. This consumer also uses server-side filtering.
232 * The host used in the URL to MR. Entries can be "host:port".
234 * The topic to consume
235 * @param consumerGroup
236 * The name of the consumer group this consumer is part of
238 * The unique id of this consume in its group
240 * The amount of time in milliseconds that the server should keep
241 * the connection open while waiting for message traffic. Use -1
242 * for default timeout.
244 * A limit on the number of messages returned in a single call.
245 * Use -1 for no limit.
247 * A Highland Park filter expression using only built-in filter
248 * components. Use null for "no filter".
252 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
253 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
254 if (MRClientBuilders.sfConsumerMock != null)
255 return MRClientBuilders.sfConsumerMock;
257 return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
259 } catch (MalformedURLException e) {
260 throw new IllegalArgumentException(e);
264 /*************************************************************************/
265 /*************************************************************************/
266 /*************************************************************************/
269 * Create a publisher that sends each message (or group of messages)
270 * immediately. Most applications should favor higher latency for much
271 * higher message throughput and the "simple publisher" is not a good
275 * The host used in the URL to MR. Can be "host:port", can be
276 * multiple comma-separated entries.
278 * The topic on which to publish messages.
279 * @return a publisher
281 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
282 return createBatchingPublisher(hostlist, topic, 1, 1);
286 * Create a publisher that batches messages. Be sure to close the publisher
287 * to send the last batch and ensure a clean shutdown. Message payloads are
291 * The host used in the URL to MR. Can be "host:port", can be
292 * multiple comma-separated entries.
294 * The topic on which to publish messages.
295 * @param maxBatchSize
296 * The largest set of messages to batch
298 * The maximum age of a message waiting in a batch
300 * @return a publisher
302 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
304 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
308 * Create a publisher that batches messages. Be sure to close the publisher
309 * to send the last batch and ensure a clean shutdown.
312 * The host used in the URL to MR. Can be "host:port", can be
313 * multiple comma-separated entries.
315 * The topic on which to publish messages.
316 * @param maxBatchSize
317 * The largest set of messages to batch
319 * The maximum age of a message waiting in a batch
321 * use gzip compression
323 * @return a publisher
325 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
326 long maxAgeMs, boolean compress) {
327 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
331 * Create a publisher that batches messages. Be sure to close the publisher
332 * to send the last batch and ensure a clean shutdown.
335 * A set of hosts to be used in the URL to MR. Can be
336 * "host:port". Use multiple entries to enable failover.
338 * The topic on which to publish messages.
339 * @param maxBatchSize
340 * The largest set of messages to batch
342 * The maximum age of a message waiting in a batch
344 * use gzip compression
346 * @return a publisher
348 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
349 long maxAgeMs, boolean compress) {
350 final TreeSet<String> hosts = new TreeSet<>();
351 for (String hp : hostSet) {
354 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
358 * Create a publisher that batches messages. Be sure to close the publisher
359 * to send the last batch and ensure a clean shutdown.
362 * A set of hosts to be used in the URL to MR. Can be
363 * "host:port". Use multiple entries to enable failover.
365 * The topic on which to publish messages.
366 * @param maxBatchSize
367 * The largest set of messages to batch
369 * The maximum age of a message waiting in a batch
371 * use gzip compression
373 * @return a publisher
375 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
376 int maxBatchSize, long maxAgeMs, boolean compress) {
377 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
378 .compress(compress).build();
382 * Create a publisher that batches messages. Be sure to close the publisher
383 * to send the last batch and ensure a clean shutdown.
386 * A host to be used in the URL to MR. Can be "host:port". Use
387 * multiple entries to enable failover.
389 * The topic on which to publish messages.
394 * @param maxBatchSize
395 * The largest set of messages to batch
397 * The maximum age of a message waiting in a batch
399 * use gzip compression
400 * @param protocolFlag
401 * http auth or ueb auth or dme2 method
402 * @param producerFilePath
403 * all properties for publisher
404 * @return MRBatchingPublisher obj
406 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
407 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
408 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
409 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
410 .compress(compress).build();
413 pub.setUsername(username);
414 pub.setPassword(password);
415 pub.setProtocolFlag(protocolFlag);
420 * Create a publisher that batches messages. Be sure to close the publisher
421 * to send the last batch and ensure a clean shutdown
424 * props set all properties for publishing message
425 * @return MRBatchingPublisher obj
426 * @throws FileNotFoundException
428 * @throws IOException
431 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
432 throws FileNotFoundException, IOException {
433 return createInternalBatchingPublisher(props, withResponse);
437 * Create a publisher that batches messages. Be sure to close the publisher
438 * to send the last batch and ensure a clean shutdown
441 * props set all properties for publishing message
442 * @return MRBatchingPublisher obj
443 * @throws FileNotFoundException
445 * @throws IOException
448 public static MRBatchingPublisher createBatchingPublisher(Properties props)
449 throws FileNotFoundException, IOException {
450 return createInternalBatchingPublisher(props, false);
454 * Create a publisher that batches messages. Be sure to close the publisher
455 * to send the last batch and ensure a clean shutdown
457 * @param producerFilePath
458 * set all properties for publishing message
459 * @return MRBatchingPublisher obj
460 * @throws FileNotFoundException
462 * @throws IOException
465 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
466 throws FileNotFoundException, IOException {
467 FileReader reader = new FileReader(new File(producerFilePath));
468 Properties props = new Properties();
470 return createBatchingPublisher(props);
474 * Create a publisher that will contain send methods that return response
477 * @param producerFilePath
478 * set all properties for publishing message
479 * @return MRBatchingPublisher obj
480 * @throws FileNotFoundException
482 * @throws IOException
485 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
486 throws FileNotFoundException, IOException {
487 FileReader reader = new FileReader(new File(producerFilePath));
488 Properties props = new Properties();
490 return createBatchingPublisher(props, withResponse);
493 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
494 throws FileNotFoundException, IOException {
495 assert props != null;
496 MRSimplerBatchPublisher pub;
498 pub = new MRSimplerBatchPublisher.Builder()
499 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
500 .onTopic(props.getProperty(TOPIC))
501 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
502 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
503 .compress(Boolean.parseBoolean(props.getProperty("compress")))
504 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
505 .withResponse(withResponse).build();
507 pub = new MRSimplerBatchPublisher.Builder()
508 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
509 .onTopic(props.getProperty(TOPIC))
510 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
511 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
512 .compress(Boolean.parseBoolean(props.getProperty("compress")))
513 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
515 pub.setHost(props.getProperty("host"));
516 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
518 pub.setAuthKey(props.getProperty(AUTH_KEY));
519 pub.setAuthDate(props.getProperty(AUTH_DATE));
520 pub.setUsername(props.getProperty(USERNAME));
521 pub.setPassword(props.getProperty(PASSWORD));
523 pub.setUsername(props.getProperty(USERNAME));
524 pub.setPassword(props.getProperty(PASSWORD));
526 pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
528 prop = new Properties();
529 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
530 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
531 routeReader = new FileReader(new File(routeFilePath));
532 File fo = new File(routeFilePath);
534 routeWriter = new FileWriter(new File(routeFilePath));
541 * Create an identity manager client to work with API keys.
544 * A set of hosts to be used in the URL to MR. Can be
545 * "host:port". Use multiple entries to enable failover.
550 * @return an identity manager
552 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
553 MRIdentityManager cim;
555 cim = new MRMetaClient(hostSet);
556 } catch (MalformedURLException e) {
557 throw new IllegalArgumentException(e);
559 cim.setApiCredentials(apiKey, apiSecret);
564 * Create a topic manager for working with topics.
567 * A set of hosts to be used in the URL to MR. Can be
568 * "host:port". Use multiple entries to enable failover.
573 * @return a topic manager
575 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
578 tmi = new MRMetaClient(hostSet);
579 } catch (MalformedURLException e) {
580 throw new IllegalArgumentException(e);
582 tmi.setApiCredentials(apiKey, apiSecret);
587 * Inject a consumer. Used to support unit tests.
591 public static void $testInject(MRConsumer cc) {
592 MRClientBuilders.sfConsumerMock = cc;
595 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
596 String id, int i, int j, String protocalFlag, String consumerFilePath) {
600 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
601 } catch (MalformedURLException e) {
602 throw new IllegalArgumentException(e);
604 sub.setUsername(username);
605 sub.setPassword(password);
607 sub.setProtocolFlag(protocalFlag);
608 sub.setConsumerFilePath(consumerFilePath);
613 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
614 String id, String protocalFlag, String consumerFilePath, int i, int j) {
618 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
619 } catch (MalformedURLException e) {
620 throw new IllegalArgumentException(e);
622 sub.setUsername(username);
623 sub.setPassword(password);
625 sub.setProtocolFlag(protocalFlag);
626 sub.setConsumerFilePath(consumerFilePath);
631 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
632 FileReader reader = new FileReader(new File(consumerFilePath));
633 Properties props = new Properties();
636 return createConsumer(props);
639 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
641 ValidatorUtil.validateSubscriber(props);
642 if (props.getProperty("timeout") != null)
643 timeout = Integer.parseInt(props.getProperty("timeout"));
647 if (props.getProperty("limit") != null)
648 limit = Integer.parseInt(props.getProperty("limit"));
652 if (props.getProperty("group") == null)
653 group = UUID.randomUUID().toString();
655 group = props.getProperty("group");
656 MRConsumerImpl sub = null;
657 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
658 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
659 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
660 props.getProperty(AUTH_KEY), props.getProperty(AUTH_DATE));
661 sub.setAuthKey(props.getProperty(AUTH_KEY));
662 sub.setAuthDate(props.getProperty(AUTH_DATE));
663 sub.setUsername(props.getProperty(USERNAME));
664 sub.setPassword(props.getProperty(PASSWORD));
666 sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty(TOPIC),
667 group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
668 props.getProperty(USERNAME), props.getProperty(PASSWORD));
669 sub.setUsername(props.getProperty(USERNAME));
670 sub.setPassword(props.getProperty(PASSWORD));
674 sub.setHost(props.getProperty("host"));
675 sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
676 sub.setfFilter(props.getProperty("filter"));
677 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
678 MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
679 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
680 routeReader = new FileReader(new File(routeFilePath));
681 prop = new Properties();
682 File fo = new File(routeFilePath);
684 routeWriter = new FileWriter(new File(routeFilePath));