1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 *******************************************************************************/
24 package org.onap.dmaap.mr.client;
26 import java.net.MalformedURLException;
27 import java.util.Collection;
28 import java.util.TreeSet;
29 import java.util.UUID;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
35 import org.onap.dmaap.mr.client.impl.MRMetaClient;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
39 * A collection of builders for various types of MR API clients
43 public class MRClientBuilders
47 * Instantiates MRClientBuilders.
49 private MRClientBuilders() {
50 // prevent instantiation
54 * A builder for a topic Consumer
57 public static class ConsumerBuilder
60 * Construct a consumer builder.
62 public ConsumerBuilder () {}
66 * @param hostList a comma-separated list of hosts to use to connect to MR
67 * @return this builder
69 public ConsumerBuilder usingHosts ( String hostList ) {
70 return usingHosts ( MRConsumerImpl.stringToList(hostList) );
75 * @param hostSet a set of hosts to use to connect to MR
76 * @return this builder
78 public ConsumerBuilder usingHosts ( Collection<String> hostSet ) {
79 fHosts = hostSet; return this;
84 * @param topic the name of the topic to consume
85 * @return this builder
87 public ConsumerBuilder onTopic ( String topic ) {
93 * Set the consumer's group and ID
94 * @param consumerGroup The name of the consumer group this consumer is part of
95 * @param consumerId The unique id of this consumer in its group
96 * @return this builder
98 public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) {
99 fGroup = consumerGroup;
105 * Set the API key and secret for this client.
108 * @return this builder
110 public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) {
112 fApiSecret = apiSecret;
117 * Set the server side timeout
118 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
119 * @return this builder
121 public ConsumerBuilder waitAtServer ( int timeoutMs ) {
122 fTimeoutMs = timeoutMs;
127 * Set the maximum number of messages to receive per transaction
128 * @param limit The maximum number of messages to receive from the server in one transaction.
129 * @return this builder
131 public ConsumerBuilder receivingAtMost ( int limit ) {
137 * Set a filter to use on the server
138 * @param filter a Highland Park standard library filter encoded in JSON
139 * @return this builder
141 public ConsumerBuilder withServerSideFilter ( String filter ) {
150 public MRConsumer build ()
152 if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
154 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
157 if ( fGroup == null )
159 fGroup = UUID.randomUUID ().toString ();
161 log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
164 if ( sfConsumerMock != null ) return sfConsumerMock;
166 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(fHosts)
167 .setTopic(fTopic).setConsumerGroup(fGroup).setConsumerId(fId)
168 .setTimeoutMs(fTimeoutMs).setLimit(fLimit).setFilter(fFilter)
169 .setApiKey_username(fApiKey).setApiSecret_password(fApiSecret)
170 .createMRConsumerImpl();
171 } catch (MalformedURLException e) {
172 throw new IllegalArgumentException(e);
176 private Collection<String> fHosts = null;
177 private String fTopic = null;
178 private String fGroup = null;
179 private String fId = null;
180 private String fApiKey = null;
181 private String fApiSecret = null;
182 private int fTimeoutMs = -1;
183 private int fLimit = -1;
184 private String fFilter = null;
187 /*************************************************************************/
188 /*************************************************************************/
189 /*************************************************************************/
192 * A publisher builder
195 public static class PublisherBuilder
197 public PublisherBuilder () {}
200 * Set the MR/UEB host(s) to use
201 * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
202 * @return this builder
204 public PublisherBuilder usingHosts ( String hostlist ) {
205 return usingHosts ( MRConsumerImpl.stringToList(hostlist) );
209 * Set the MR/UEB host(s) to use
210 * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
211 * @return this builder
213 public PublisherBuilder usingHosts ( String[] hostSet )
215 final TreeSet<String> hosts = new TreeSet<> ();
216 for ( String hp : hostSet )
220 return usingHosts ( hosts );
224 * Set the MR/UEB host(s) to use
225 * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
226 * @return this builder
228 public PublisherBuilder usingHosts ( Collection<String> hostlist ) {
234 * Set the topic to publish on
235 * @param topic The topic on which to publish messages.
236 * @return this builder
238 public PublisherBuilder onTopic ( String topic ) {
244 * Batch message sends with the given limits.
245 * @param messageCount The largest set of messages to batch.
246 * @param ageInMs The maximum age of a message waiting in a batch.
247 * @return this builder
249 public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) {
250 fMaxBatchSize = messageCount;
251 fMaxBatchAgeMs = ageInMs;
256 * Compress transactions
257 * @return this builder
259 public PublisherBuilder withCompresion () {
260 return enableCompresion(true);
264 * Do not compress transactions
265 * @return this builder
267 public PublisherBuilder withoutCompresion () {
268 return enableCompresion(false);
272 * Set the compression option
273 * @param compress true to gzip compress transactions
274 * @return this builder
276 public PublisherBuilder enableCompresion ( boolean compress ) {
277 fCompress = compress;
282 * Set the API key and secret for this client.
285 * @return this builder
287 public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) {
289 fApiSecret = apiSecret;
294 * Build the publisher
295 * @return a batching publisher
297 public MRBatchingPublisher build ()
299 if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
301 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
304 if ( sfPublisherMock != null ) return sfPublisherMock;
306 final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
307 againstUrls ( fHosts ).
309 batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
310 compress ( fCompress ).
312 if ( fApiKey != null )
314 pub.setApiCredentials ( fApiKey, fApiSecret );
319 private Collection<String> fHosts = null;
320 private String fTopic = null;
321 private int fMaxBatchSize = 1;
322 private int fMaxBatchAgeMs = 1;
323 private boolean fCompress = false;
324 private String fApiKey = null;
325 private String fApiSecret = null;
329 * A builder for an identity manager
332 public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
335 * Construct an identity manager builder.
337 public IdentityManagerBuilder () {}
340 protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
341 return new MRMetaClient ( hosts );
342 } catch (MalformedURLException e) {
343 throw new IllegalArgumentException(e);
348 * A builder for a topic manager
351 public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
354 * Construct an topic manager builder.
356 public TopicManagerBuilder () {}
359 protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
360 return new MRMetaClient ( hosts );
361 } catch (MalformedURLException e) {
362 throw new IllegalArgumentException(e);
367 * Inject a consumer. Used to support unit tests.
370 public static void $testInject ( MRConsumer cc )
376 * Inject a publisher. Used to support unit tests.
379 public static void $testInject ( MRBatchingPublisher pub )
381 sfPublisherMock = pub;
384 static MRConsumer sfConsumerMock = null;
385 static MRBatchingPublisher sfPublisherMock = null;
388 * A builder for an identity manager
391 public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
394 * Construct an identity manager builder.
396 public AbstractAuthenticatedManagerBuilder () {}
400 * @param hostList a comma-separated list of hosts to use to connect to MR
401 * @return this builder
403 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) {
404 return usingHosts ( MRConsumerImpl.stringToList(hostList) );
409 * @param hostSet a set of hosts to use to connect to MR
410 * @return this builder
412 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) {
418 * Set the API key and secret for this client.
421 * @return this builder
423 public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) {
425 fApiSecret = apiSecret;
435 if ( fHosts.isEmpty() )
437 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
440 final T mgr = constructClient ( fHosts );
441 mgr.setApiCredentials ( fApiKey, fApiSecret );
445 protected abstract T constructClient ( Collection<String> hosts );
447 private Collection<String> fHosts = null;
448 private String fApiKey = null;
449 private String fApiSecret = null;
452 private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );