1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2018 IBM.
8 * Modifications Copyright © 2021 Orange.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
26 package org.onap.dmaap.mr.client;
28 import java.net.MalformedURLException;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
34 import org.onap.dmaap.mr.client.impl.MRMetaClient;
35 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * A collection of builders for various types of MR API clients.
44 public class MRClientBuilders {
45 private static final Logger logger = LoggerFactory.getLogger(MRClientBuilders.class);
47 private static final String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name.";
50 * Instantiates MRClientBuilders.
52 private MRClientBuilders() {
53 // prevent instantiation
57 * A builder for a topic Consumer.
61 public static class ConsumerBuilder {
66 * @param hostList a comma-separated list of hosts to use to connect to MR
67 * @return this builder
69 public ConsumerBuilder usingHosts(String hostList) {
70 return usingHosts(MRConsumerImpl.stringToList(hostList));
76 * @param hostSet a set of hosts to use to connect to MR
77 * @return this builder
79 public ConsumerBuilder usingHosts(Collection<String> hostSet) {
87 * @param topic the name of the topic to consume
88 * @return this builder
90 public ConsumerBuilder onTopic(String topic) {
96 * Set the consumer's group and ID.
98 * @param consumerGroup The name of the consumer group this consumer is part of
99 * @param consumerId The unique id of this consumer in its group
100 * @return this builder
102 public ConsumerBuilder knownAs(String consumerGroup, String consumerId) {
103 fGroup = consumerGroup;
109 * Set the API key and secret for this client.
113 * @return this builder
115 public ConsumerBuilder authenticatedBy(String apiKey, String apiSecret) {
117 fApiSecret = apiSecret;
122 * Set the server side timeout.
124 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
125 * @return this builder
127 public ConsumerBuilder waitAtServer(int timeoutMs) {
128 fTimeoutMs = timeoutMs;
133 * Set the maximum number of messages to receive per transaction.
135 * @param limit The maximum number of messages to receive from the server in one transaction.
136 * @return this builder
138 public ConsumerBuilder receivingAtMost(int limit) {
144 * Set a filter to use on the server.
146 * @param filter a Highland Park standard library filter encoded in JSON
147 * @return this builder
149 public ConsumerBuilder withServerSideFilter(String filter) {
155 * Build the consumer.
159 public MRConsumer build() {
160 if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
161 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
164 if (fGroup == null) {
165 fGroup = UUID.randomUUID().toString();
167 logger.info("Creating non-restartable client with group {} and ID {}.", fGroup, fId);
170 if (sfConsumerMock != null) {
171 return sfConsumerMock;
174 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(fHosts)
175 .setTopic(fTopic).setConsumerGroup(fGroup).setConsumerId(fId)
176 .setTimeoutMs(fTimeoutMs).setLimit(fLimit).setFilter(fFilter)
177 .setApiKey_username(fApiKey).setApiSecret_password(fApiSecret)
178 .createMRConsumerImpl();
179 } catch (MalformedURLException e) {
180 throw new IllegalArgumentException(e);
184 private Collection<String> fHosts = null;
185 private String fTopic = null;
186 private String fGroup = null;
187 private String fId = null;
188 private String fApiKey = null;
189 private String fApiSecret = null;
190 private int fTimeoutMs = -1;
191 private int fLimit = -1;
192 private String fFilter = null;
195 //*************************************************************************/
196 //*************************************************************************/
197 //*************************************************************************/
200 * A publisher builder.
204 public static class PublisherBuilder {
207 * Set the MR/UEB host(s) to use.
209 * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
210 * @return this builder
212 public PublisherBuilder usingHosts(String hostlist) {
213 return usingHosts(MRConsumerImpl.stringToList(hostlist));
217 * Set the MR/UEB host(s) to use.
219 * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
220 * @return this builder
222 public PublisherBuilder usingHosts(String[] hostSet) {
223 final TreeSet<String> hosts = new TreeSet<>();
224 Collections.addAll(hosts, hostSet);
225 return usingHosts(hosts);
229 * Set the MR/UEB host(s) to use.
231 * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
232 * @return this builder
234 public PublisherBuilder usingHosts(Collection<String> hostlist) {
240 * Set the topic to publish on.
242 * @param topic The topic on which to publish messages.
243 * @return this builder
245 public PublisherBuilder onTopic(String topic) {
251 * Batch message sends with the given limits.
253 * @param messageCount The largest set of messages to batch.
254 * @param ageInMs The maximum age of a message waiting in a batch.
255 * @return this builder
257 public PublisherBuilder limitBatch(int messageCount, int ageInMs) {
258 fMaxBatchSize = messageCount;
259 fMaxBatchAgeMs = ageInMs;
264 * Compress transactions.
266 * @return this builder
268 public PublisherBuilder withCompresion() {
269 return enableCompresion(true);
273 * Do not compress transactions.
275 * @return this builder
277 public PublisherBuilder withoutCompresion() {
278 return enableCompresion(false);
282 * Set the compression option.
284 * @param compress true to gzip compress transactions
285 * @return this builder
287 public PublisherBuilder enableCompresion(boolean compress) {
288 fCompress = compress;
293 * Set the API key and secret for this client.
297 * @return this builder
299 public PublisherBuilder authenticatedBy(String apiKey, String apiSecret) {
301 fApiSecret = apiSecret;
306 * Build the publisher.
308 * @return a batching publisher
310 public MRBatchingPublisher build() {
311 if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
312 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
315 if (sfPublisherMock != null) {
316 return sfPublisherMock;
319 final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
322 .batchTo(fMaxBatchSize, fMaxBatchAgeMs)
325 if (fApiKey != null) {
326 pub.setApiCredentials(fApiKey, fApiSecret);
331 private Collection<String> fHosts = null;
332 private String fTopic = null;
333 private int fMaxBatchSize = 1;
334 private int fMaxBatchAgeMs = 1;
335 private boolean fCompress = false;
336 private String fApiKey = null;
337 private String fApiSecret = null;
341 * A builder for an identity manager.
345 public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> {
348 protected MRIdentityManager constructClient(Collection<String> hosts) {
350 return new MRMetaClient(hosts);
351 } catch (MalformedURLException e) {
352 throw new IllegalArgumentException(e);
358 * A builder for a topic manager.
362 public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> {
365 protected MRTopicManager constructClient(Collection<String> hosts) {
367 return new MRMetaClient(hosts);
368 } catch (MalformedURLException e) {
369 throw new IllegalArgumentException(e);
375 * Inject a consumer. Used to support unit tests.
379 public static void $testInject(MRConsumer cc) {
384 * Inject a publisher. Used to support unit tests.
388 public static void $testInject(MRBatchingPublisher pub) {
389 sfPublisherMock = pub;
392 static MRConsumer sfConsumerMock = null;
393 static MRBatchingPublisher sfPublisherMock = null;
396 * A builder for an identity manager.
400 public abstract static class AbstractAuthenticatedManagerBuilder<T extends MRClient> {
405 * @param hostList a comma-separated list of hosts to use to connect to MR
406 * @return this builder
408 public AbstractAuthenticatedManagerBuilder<T> usingHosts(String hostList) {
409 return usingHosts(MRConsumerImpl.stringToList(hostList));
415 * @param hostSet a set of hosts to use to connect to MR
416 * @return this builder
418 public AbstractAuthenticatedManagerBuilder<T> usingHosts(Collection<String> hostSet) {
424 * Set the API key and secret for this client.
428 * @return this builder
430 public AbstractAuthenticatedManagerBuilder<T> authenticatedBy(String apiKey, String apiSecret) {
432 fApiSecret = apiSecret;
437 * Build the consumer.
442 if (fHosts.isEmpty()) {
443 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
446 final T mgr = constructClient(fHosts);
447 mgr.setApiCredentials(fApiKey, fApiSecret);
451 protected abstract T constructClient(Collection<String> hosts);
453 private Collection<String> fHosts = null;
454 private String fApiKey = null;
455 private String fApiSecret = null;