1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client;
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import com.att.nsa.mr.client.impl.MRConsumerImpl;
33 import com.att.nsa.mr.client.impl.MRMetaClient;
34 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
37 * A collection of builders for various types of MR API clients
41 public class MRClientBuilders
44 * A builder for a topic Consumer
47 public static class ConsumerBuilder
50 * Construct a consumer builder.
52 public ConsumerBuilder () {}
56 * @param hostList a comma-separated list of hosts to use to connect to MR
57 * @return this builder
59 public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
63 * @param hostSet a set of hosts to use to connect to MR
64 * @return this builder
66 public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
70 * @param topic the name of the topic to consume
71 * @return this builder
73 public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; }
76 * Set the consumer's group and ID
77 * @param consumerGroup The name of the consumer group this consumer is part of
78 * @param consumerId The unique id of this consumer in its group
79 * @return this builder
81 public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; }
84 * Set the API key and secret for this client.
87 * @return this builder
89 public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
92 * Set the server side timeout
93 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
94 * @return this builder
96 public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; };
99 * Set the maximum number of messages to receive per transaction
100 * @param limit The maximum number of messages to receive from the server in one transaction.
101 * @return this builder
103 public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; };
106 * Set a filter to use on the server
107 * @param filter a Highland Park standard library filter encoded in JSON
108 * @return this builder
110 public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; }
116 public MRConsumer build ()
118 if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
120 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
123 if ( fGroup == null )
125 fGroup = UUID.randomUUID ().toString ();
127 log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
130 if ( sfConsumerMock != null ) return sfConsumerMock;
132 return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
133 } catch (MalformedURLException e) {
134 throw new IllegalArgumentException(e);
138 private Collection<String> fHosts = null;
139 private String fTopic = null;
140 private String fGroup = null;
141 private String fId = null;
142 private String fApiKey = null;
143 private String fApiSecret = null;
144 private int fTimeoutMs = -1;
145 private int fLimit = -1;
146 private String fFilter = null;
149 /*************************************************************************/
150 /*************************************************************************/
151 /*************************************************************************/
154 * A publisher builder
157 public static class PublisherBuilder
159 public PublisherBuilder () {}
162 * Set the MR/UEB host(s) to use
163 * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
164 * @return this builder
166 public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); }
169 * Set the MR/UEB host(s) to use
170 * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
171 * @return this builder
173 public PublisherBuilder usingHosts ( String[] hostSet )
175 final TreeSet<String> hosts = new TreeSet<String> ();
176 for ( String hp : hostSet )
180 return usingHosts ( hosts );
184 * Set the MR/UEB host(s) to use
185 * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
186 * @return this builder
188 public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; }
191 * Set the topic to publish on
192 * @param topic The topic on which to publish messages.
193 * @return this builder
195 public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; }
198 * Batch message sends with the given limits.
199 * @param messageCount The largest set of messages to batch.
200 * @param ageInMs The maximum age of a message waiting in a batch.
201 * @return this builder
203 public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; }
206 * Compress transactions
207 * @return this builder
209 public PublisherBuilder withCompresion () { return enableCompresion(true); }
212 * Do not compress transactions
213 * @return this builder
215 public PublisherBuilder withoutCompresion () { return enableCompresion(false); }
218 * Set the compression option
219 * @param compress true to gzip compress transactions
220 * @return this builder
222 public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; }
225 * Set the API key and secret for this client.
228 * @return this builder
230 public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
233 * Build the publisher
234 * @return a batching publisher
236 public MRBatchingPublisher build ()
238 if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
240 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
243 if ( sfPublisherMock != null ) return sfPublisherMock;
245 final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
246 againstUrls ( fHosts ).
248 batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
249 compress ( fCompress ).
251 if ( fApiKey != null )
253 pub.setApiCredentials ( fApiKey, fApiSecret );
258 private Collection<String> fHosts = null;
259 private String fTopic = null;
260 private int fMaxBatchSize = 1;
261 private int fMaxBatchAgeMs = 1;
262 private boolean fCompress = false;
263 private String fApiKey = null;
264 private String fApiSecret = null;
268 * A builder for an identity manager
271 public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
274 * Construct an identity manager builder.
276 public IdentityManagerBuilder () {}
279 protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
280 return new MRMetaClient ( hosts );
281 } catch (MalformedURLException e) {
282 throw new IllegalArgumentException(e);
287 * A builder for a topic manager
290 public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
293 * Construct an topic manager builder.
295 public TopicManagerBuilder () {}
298 protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
299 return new MRMetaClient ( hosts );
300 } catch (MalformedURLException e) {
301 throw new IllegalArgumentException(e);
306 * Inject a consumer. Used to support unit tests.
309 public static void $testInject ( MRConsumer cc )
315 * Inject a publisher. Used to support unit tests.
318 public static void $testInject ( MRBatchingPublisher pub )
320 sfPublisherMock = pub;
323 static MRConsumer sfConsumerMock = null;
324 static MRBatchingPublisher sfPublisherMock = null;
327 * A builder for an identity manager
330 public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
333 * Construct an identity manager builder.
335 public AbstractAuthenticatedManagerBuilder () {}
339 * @param hostList a comma-separated list of hosts to use to connect to MR
340 * @return this builder
342 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
346 * @param hostSet a set of hosts to use to connect to MR
347 * @return this builder
349 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
352 * Set the API key and secret for this client.
355 * @return this builder
357 public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
365 if ( fHosts.isEmpty() )
367 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
370 final T mgr = constructClient ( fHosts );
371 mgr.setApiCredentials ( fApiKey, fApiSecret );
375 protected abstract T constructClient ( Collection<String> hosts );
377 private Collection<String> fHosts = null;
378 private String fApiKey = null;
379 private String fApiSecret = null;
382 private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );