1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
24 package org.onap.dmaap.mr.client;
27 import java.io.FileNotFoundException;
28 import java.io.FileReader;
29 import java.io.FileWriter;
30 import java.io.IOException;
31 import java.net.MalformedURLException;
32 import java.util.Collection;
34 import java.util.Properties;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import javax.ws.rs.core.MultivaluedMap;
38 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
39 import org.onap.dmaap.mr.client.impl.MRMetaClient;
40 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
41 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
42 import org.onap.dmaap.mr.tools.ValidatorUtil;
45 * A factory for MR clients.<br/>
47 * Use caution selecting a consumer creator factory. If the call doesn't accept
48 * a consumer group name, then it creates a consumer that is not restartable.
49 * That is, if you stop your process and start it again, your client will NOT
50 * receive any missed messages on the topic. If you need to ensure receipt of
51 * missed messages, then you must use a consumer that's created with a group
52 * name and ID. (If you create multiple consumer processes using the same group,
53 * load is split across them. Be sure to use a different ID for each
60 public class MRClientFactory {
61 private static final String AUTH_KEY = "authKey";
62 private static final String AUTH_DATE = "authDate";
63 private static final String PASSWORD = "password";
64 private static final String USERNAME = "username";
65 private static final String DME2PREFERRED_ROUTER_FILE_PATH = "DME2preferredRouterFilePath";
66 private static final String TOPIC = "topic";
67 private static final String TRANSPORT_TYPE = "TransportType";
68 public static MultivaluedMap<String, Object> HTTPHeadersMap;
69 public static Map<String, String> DME2HeadersMap;
70 public static String routeFilePath;
72 public static FileReader routeReader;
74 public static FileWriter routeWriter = null;
75 public static Properties prop = null;
78 * Instantiates MRClientFactory.
80 private MRClientFactory() {
81 //prevents instantiation.
85 * Create a consumer instance with the default timeout and no limit on
86 * messages returned. This consumer operates as an independent consumer
87 * (i.e., not in a group) and is NOT re-startable across sessions.
90 * A comma separated list of hosts to use to connect to MR. You
91 * can include port numbers (3904 is the default). For example,
95 * The topic to consume
99 public static MRConsumer createConsumer(String hostList, String topic) {
100 return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
104 * Create a consumer instance with the default timeout and no limit on
105 * messages returned. This consumer operates as an independent consumer
106 * (i.e., not in a group) and is NOT re-startable across sessions.
109 * The host used in the URL to MR. Entries can be "host:port".
111 * The topic to consume
115 public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
116 return createConsumer(hostSet, topic, null);
120 * Create a consumer instance with server-side filtering, the default
121 * timeout, and no limit on messages returned. This consumer operates as an
122 * independent consumer (i.e., not in a group) and is NOT re-startable
126 * The host used in the URL to MR. Entries can be "host:port".
128 * The topic to consume
130 * a filter to use on the server side
134 public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
135 return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
139 * Create a consumer instance with the default timeout, and no limit on
140 * messages returned. This consumer can operate in a logical group and is
141 * re-startable across sessions when you use the same group and ID on
145 * The host used in the URL to MR. Entries can be "host:port".
147 * The topic to consume
148 * @param consumerGroup
149 * The name of the consumer group this consumer is part of
151 * The unique id of this consume in its group
155 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
156 final String consumerId) {
157 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
161 * Create a consumer instance with the default timeout, and no limit on
162 * messages returned. This consumer can operate in a logical group and is
163 * re-startable across sessions when you use the same group and ID on
167 * The host used in the URL to MR. Entries can be "host:port".
169 * The topic to consume
170 * @param consumerGroup
171 * The name of the consumer group this consumer is part of
173 * The unique id of this consume in its group
175 * The amount of time in milliseconds that the server should keep
176 * the connection open while waiting for message traffic. Use -1
177 * for default timeout.
179 * A limit on the number of messages returned in a single call.
180 * Use -1 for no limit.
184 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
185 final String consumerId, int timeoutMs, int limit) {
186 return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
190 * Create a consumer instance with the default timeout, and no limit on
191 * messages returned. This consumer can operate in a logical group and is
192 * re-startable across sessions when you use the same group and ID on
193 * restart. This consumer also uses server-side filtering.
196 * A comma separated list of hosts to use to connect to MR. You
197 * can include port numbers (3904 is the default)"
199 * The topic to consume
200 * @param consumerGroup
201 * The name of the consumer group this consumer is part of
203 * The unique id of this consume in its group
205 * The amount of time in milliseconds that the server should keep
206 * the connection open while waiting for message traffic. Use -1
207 * for default timeout.
209 * A limit on the number of messages returned in a single call.
210 * Use -1 for no limit.
212 * A Highland Park filter expression using only built-in filter
213 * components. Use null for "no filter".
217 public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
218 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
219 return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
220 filter, apiKey, apiSecret);
224 * Create a consumer instance with the default timeout, and no limit on
225 * messages returned. This consumer can operate in a logical group and is
226 * re-startable across sessions when you use the same group and ID on
227 * restart. This consumer also uses server-side filtering.
230 * The host used in the URL to MR. Entries can be "host:port".
232 * The topic to consume
233 * @param consumerGroup
234 * The name of the consumer group this consumer is part of
236 * The unique id of this consume in its group
238 * The amount of time in milliseconds that the server should keep
239 * the connection open while waiting for message traffic. Use -1
240 * for default timeout.
242 * A limit on the number of messages returned in a single call.
243 * Use -1 for no limit.
245 * A Highland Park filter expression using only built-in filter
246 * components. Use null for "no filter".
250 public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
251 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
252 if (MRClientBuilders.sfConsumerMock != null)
253 return MRClientBuilders.sfConsumerMock;
255 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(hostSet).setTopic(topic)
256 .setConsumerGroup(consumerGroup).setConsumerId(consumerId)
257 .setTimeoutMs(timeoutMs).setLimit(limit).setFilter(filter)
258 .setApiKey_username(apiKey).setApiSecret_password(apiSecret)
259 .createMRConsumerImpl();
260 } catch (MalformedURLException e) {
261 throw new IllegalArgumentException(e);
265 /*************************************************************************/
266 /*************************************************************************/
267 /*************************************************************************/
270 * Create a publisher that sends each message (or group of messages)
271 * immediately. Most applications should favor higher latency for much
272 * higher message throughput and the "simple publisher" is not a good
276 * The host used in the URL to MR. Can be "host:port", can be
277 * multiple comma-separated entries.
279 * The topic on which to publish messages.
280 * @return a publisher
282 public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
283 return createBatchingPublisher(hostlist, topic, 1, 1);
287 * Create a publisher that batches messages. Be sure to close the publisher
288 * to send the last batch and ensure a clean shutdown. Message payloads are
292 * The host used in the URL to MR. Can be "host:port", can be
293 * multiple comma-separated entries.
295 * The topic on which to publish messages.
296 * @param maxBatchSize
297 * The largest set of messages to batch
299 * The maximum age of a message waiting in a batch
301 * @return a publisher
303 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
305 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
309 * Create a publisher that batches messages. Be sure to close the publisher
310 * to send the last batch and ensure a clean shutdown.
313 * The host used in the URL to MR. Can be "host:port", can be
314 * multiple comma-separated entries.
316 * The topic on which to publish messages.
317 * @param maxBatchSize
318 * The largest set of messages to batch
320 * The maximum age of a message waiting in a batch
322 * use gzip compression
324 * @return a publisher
326 public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
327 long maxAgeMs, boolean compress) {
328 return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
332 * Create a publisher that batches messages. Be sure to close the publisher
333 * to send the last batch and ensure a clean shutdown.
336 * A set of hosts to be used in the URL to MR. Can be
337 * "host:port". Use multiple entries to enable failover.
339 * The topic on which to publish messages.
340 * @param maxBatchSize
341 * The largest set of messages to batch
343 * The maximum age of a message waiting in a batch
345 * use gzip compression
347 * @return a publisher
349 public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
350 long maxAgeMs, boolean compress) {
351 final TreeSet<String> hosts = new TreeSet<>();
352 for (String hp : hostSet) {
355 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
359 * Create a publisher that batches messages. Be sure to close the publisher
360 * to send the last batch and ensure a clean shutdown.
363 * A set of hosts to be used in the URL to MR. Can be
364 * "host:port". Use multiple entries to enable failover.
366 * The topic on which to publish messages.
367 * @param maxBatchSize
368 * The largest set of messages to batch
370 * The maximum age of a message waiting in a batch
372 * use gzip compression
374 * @return a publisher
376 public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
377 int maxBatchSize, long maxAgeMs, boolean compress) {
378 return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
379 .compress(compress).build();
383 * Create a publisher that batches messages. Be sure to close the publisher
384 * to send the last batch and ensure a clean shutdown.
387 * A host to be used in the URL to MR. Can be "host:port". Use
388 * multiple entries to enable failover.
390 * The topic on which to publish messages.
395 * @param maxBatchSize
396 * The largest set of messages to batch
398 * The maximum age of a message waiting in a batch
400 * use gzip compression
401 * @param protocolFlag
402 * http auth or ueb auth or dme2 method
403 * @param producerFilePath
404 * all properties for publisher
405 * @return MRBatchingPublisher obj
407 public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
408 final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag) {
409 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
410 .againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
411 .compress(compress).build();
414 pub.setUsername(username);
415 pub.setPassword(password);
416 pub.setProtocolFlag(protocolFlag);
421 * Create a publisher that batches messages. Be sure to close the publisher
422 * to send the last batch and ensure a clean shutdown
425 * props set all properties for publishing message
426 * @return MRBatchingPublisher obj
427 * @throws FileNotFoundException
429 * @throws IOException
432 public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
433 throws FileNotFoundException, IOException {
434 return createInternalBatchingPublisher(props, withResponse);
438 * Create a publisher that batches messages. Be sure to close the publisher
439 * to send the last batch and ensure a clean shutdown
442 * props set all properties for publishing message
443 * @return MRBatchingPublisher obj
444 * @throws FileNotFoundException
446 * @throws IOException
449 public static MRBatchingPublisher createBatchingPublisher(Properties props)
450 throws FileNotFoundException, IOException {
451 return createInternalBatchingPublisher(props, false);
455 * Create a publisher that batches messages. Be sure to close the publisher
456 * to send the last batch and ensure a clean shutdown
458 * @param producerFilePath
459 * set all properties for publishing message
460 * @return MRBatchingPublisher obj
461 * @throws FileNotFoundException
463 * @throws IOException
466 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
467 throws FileNotFoundException, IOException {
468 FileReader reader = new FileReader(new File(producerFilePath));
469 Properties props = new Properties();
471 return createBatchingPublisher(props);
475 * Create a publisher that will contain send methods that return response
478 * @param producerFilePath
479 * set all properties for publishing message
480 * @return MRBatchingPublisher obj
481 * @throws FileNotFoundException
483 * @throws IOException
486 public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
487 throws FileNotFoundException, IOException {
488 FileReader reader = new FileReader(new File(producerFilePath));
489 Properties props = new Properties();
491 return createBatchingPublisher(props, withResponse);
494 protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
495 throws FileNotFoundException, IOException {
496 assert props != null;
497 MRSimplerBatchPublisher pub;
499 pub = new MRSimplerBatchPublisher.Builder()
500 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
501 .onTopic(props.getProperty(TOPIC))
502 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
503 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
504 .compress(Boolean.parseBoolean(props.getProperty("compress")))
505 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
506 .withResponse(withResponse).build();
508 pub = new MRSimplerBatchPublisher.Builder()
509 .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty(TRANSPORT_TYPE))
510 .onTopic(props.getProperty(TOPIC))
511 .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
512 Integer.parseInt(props.getProperty("maxAgeMs").toString()))
513 .compress(Boolean.parseBoolean(props.getProperty("compress")))
514 .httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
516 pub.setHost(props.getProperty("host"));
517 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
519 pub.setAuthKey(props.getProperty(AUTH_KEY));
520 pub.setAuthDate(props.getProperty(AUTH_DATE));
521 pub.setUsername(props.getProperty(USERNAME));
522 pub.setPassword(props.getProperty(PASSWORD));
524 pub.setUsername(props.getProperty(USERNAME));
525 pub.setPassword(props.getProperty(PASSWORD));
527 pub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
529 prop = new Properties();
530 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
531 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
532 routeReader = new FileReader(new File(routeFilePath));
533 File fo = new File(routeFilePath);
535 routeWriter = new FileWriter(new File(routeFilePath));
542 * Create an identity manager client to work with API keys.
545 * A set of hosts to be used in the URL to MR. Can be
546 * "host:port". Use multiple entries to enable failover.
551 * @return an identity manager
553 public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
554 MRIdentityManager cim;
556 cim = new MRMetaClient(hostSet);
557 } catch (MalformedURLException e) {
558 throw new IllegalArgumentException(e);
560 cim.setApiCredentials(apiKey, apiSecret);
565 * Create a topic manager for working with topics.
568 * A set of hosts to be used in the URL to MR. Can be
569 * "host:port". Use multiple entries to enable failover.
574 * @return a topic manager
576 public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
579 tmi = new MRMetaClient(hostSet);
580 } catch (MalformedURLException e) {
581 throw new IllegalArgumentException(e);
583 tmi.setApiCredentials(apiKey, apiSecret);
588 * Inject a consumer. Used to support unit tests.
592 public static void $testInject(MRConsumer cc) {
593 MRClientBuilders.sfConsumerMock = cc;
596 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
597 String id, int i, int j, String protocalFlag, String consumerFilePath) {
601 sub = new MRConsumerImpl.MRConsumerImplBuilder()
602 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
603 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
604 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
605 .createMRConsumerImpl();
606 } catch (MalformedURLException e) {
607 throw new IllegalArgumentException(e);
609 sub.setUsername(username);
610 sub.setPassword(password);
612 sub.setProtocolFlag(protocalFlag);
613 sub.setConsumerFilePath(consumerFilePath);
618 public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
619 String id, String protocalFlag, String consumerFilePath, int i, int j) {
623 sub = new MRConsumerImpl.MRConsumerImplBuilder()
624 .setHostPart(MRConsumerImpl.stringToList(host)).setTopic(topic)
625 .setConsumerGroup(group).setConsumerId(id).setTimeoutMs(i).setLimit(j)
626 .setFilter(null).setApiKey_username(null).setApiSecret_password(null)
627 .createMRConsumerImpl();
628 } catch (MalformedURLException e) {
629 throw new IllegalArgumentException(e);
631 sub.setUsername(username);
632 sub.setPassword(password);
634 sub.setProtocolFlag(protocalFlag);
635 sub.setConsumerFilePath(consumerFilePath);
640 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
641 FileReader reader = new FileReader(new File(consumerFilePath));
642 Properties props = new Properties();
645 return createConsumer(props);
648 public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
650 ValidatorUtil.validateSubscriber(props);
651 if (props.getProperty("timeout") != null)
652 timeout = Integer.parseInt(props.getProperty("timeout"));
656 if (props.getProperty("limit") != null)
657 limit = Integer.parseInt(props.getProperty("limit"));
661 if (props.getProperty("group") == null)
662 group = UUID.randomUUID().toString();
664 group = props.getProperty("group");
665 MRConsumerImpl sub = null;
666 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
667 sub = new MRConsumerImpl.MRConsumerImplBuilder()
668 .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
669 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
670 .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
671 .setFilter(props.getProperty("filter"))
672 .setApiKey_username(props.getProperty(AUTH_KEY))
673 .setApiSecret_password(props.getProperty(AUTH_DATE)).createMRConsumerImpl();
674 sub.setAuthKey(props.getProperty(AUTH_KEY));
675 sub.setAuthDate(props.getProperty(AUTH_DATE));
676 sub.setUsername(props.getProperty(USERNAME));
677 sub.setPassword(props.getProperty(PASSWORD));
679 sub = new MRConsumerImpl.MRConsumerImplBuilder()
680 .setHostPart(MRConsumerImpl.stringToList(props.getProperty("host")))
681 .setTopic(props.getProperty(TOPIC)).setConsumerGroup(group)
682 .setConsumerId(props.getProperty("id")).setTimeoutMs(timeout).setLimit(limit)
683 .setFilter(props.getProperty("filter"))
684 .setApiKey_username(props.getProperty(USERNAME))
685 .setApiSecret_password(props.getProperty(PASSWORD)).createMRConsumerImpl();
686 sub.setUsername(props.getProperty(USERNAME));
687 sub.setPassword(props.getProperty(PASSWORD));
691 sub.setHost(props.getProperty("host"));
692 sub.setProtocolFlag(props.getProperty(TRANSPORT_TYPE));
693 sub.setfFilter(props.getProperty("filter"));
694 if (props.getProperty(TRANSPORT_TYPE).equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
695 MRConsumerImpl.setRouterFilePath(props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH));
696 routeFilePath = props.getProperty(DME2PREFERRED_ROUTER_FILE_PATH);
697 routeReader = new FileReader(new File(routeFilePath));
698 prop = new Properties();
699 File fo = new File(routeFilePath);
701 routeWriter = new FileWriter(new File(routeFilePath));