Sonar issues review, fix properties constant name.
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientBuilders.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  *   Modifications Copyright © 2021 Orange.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14  *
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *
24  *******************************************************************************/
25
26 package org.onap.dmaap.mr.client;
27
28 import java.net.MalformedURLException;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.TreeSet;
32 import java.util.UUID;
33 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
34 import org.onap.dmaap.mr.client.impl.MRMetaClient;
35 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * A collection of builders for various types of MR API clients.
41  *
42  * @author author
43  */
44 public class MRClientBuilders {
45     private static final Logger logger = LoggerFactory.getLogger(MRClientBuilders.class);
46
47     private static final String ILLEGAL_ARGUMENT_MESSAGE = "You must provide at least one host and a topic name.";
48
49     /**
50      * Instantiates MRClientBuilders.
51      */
52     private MRClientBuilders() {
53         // prevent instantiation
54     }
55
56     /**
57      * A builder for a topic Consumer.
58      *
59      * @author author
60      */
61     public static class ConsumerBuilder {
62
63         /**
64          * Set the host list.
65          *
66          * @param hostList a comma-separated list of hosts to use to connect to MR
67          * @return this builder
68          */
69         public ConsumerBuilder usingHosts(String hostList) {
70             return usingHosts(MRConsumerImpl.stringToList(hostList));
71         }
72
73         /**
74          * Set the host list.
75          *
76          * @param hostSet a set of hosts to use to connect to MR
77          * @return this builder
78          */
79         public ConsumerBuilder usingHosts(Collection<String> hostSet) {
80             fHosts = hostSet;
81             return this;
82         }
83
84         /**
85          * Set the topic.
86          *
87          * @param topic the name of the topic to consume
88          * @return this builder
89          */
90         public ConsumerBuilder onTopic(String topic) {
91             fTopic = topic;
92             return this;
93         }
94
95         /**
96          * Set the consumer's group and ID.
97          *
98          * @param consumerGroup The name of the consumer group this consumer is part of
99          * @param consumerId    The unique id of this consumer in its group
100          * @return this builder
101          */
102         public ConsumerBuilder knownAs(String consumerGroup, String consumerId) {
103             fGroup = consumerGroup;
104             fId = consumerId;
105             return this;
106         }
107
108         /**
109          * Set the API key and secret for this client.
110          *
111          * @param apiKey
112          * @param apiSecret
113          * @return this builder
114          */
115         public ConsumerBuilder authenticatedBy(String apiKey, String apiSecret) {
116             fApiKey = apiKey;
117             fApiSecret = apiSecret;
118             return this;
119         }
120
121         /**
122          * Set the server side timeout.
123          *
124          * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
125          * @return this builder
126          */
127         public ConsumerBuilder waitAtServer(int timeoutMs) {
128             fTimeoutMs = timeoutMs;
129             return this;
130         }
131
132         /**
133          * Set the maximum number of messages to receive per transaction.
134          *
135          * @param limit The maximum number of messages to receive from the server in one transaction.
136          * @return this builder
137          */
138         public ConsumerBuilder receivingAtMost(int limit) {
139             fLimit = limit;
140             return this;
141         }
142
143         /**
144          * Set a filter to use on the server.
145          *
146          * @param filter a Highland Park standard library filter encoded in JSON
147          * @return this builder
148          */
149         public ConsumerBuilder withServerSideFilter(String filter) {
150             fFilter = filter;
151             return this;
152         }
153
154         /**
155          * Build the consumer.
156          *
157          * @return a consumer
158          */
159         public MRConsumer build() {
160             if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
161                 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
162             }
163
164             if (fGroup == null) {
165                 fGroup = UUID.randomUUID().toString();
166                 fId = "0";
167                 logger.info("Creating non-restartable client with group {} and ID {}.", fGroup, fId);
168             }
169
170             if (sfConsumerMock != null) {
171                 return sfConsumerMock;
172             }
173             try {
174                 return new MRConsumerImpl.MRConsumerImplBuilder().setHostPart(fHosts)
175                         .setTopic(fTopic).setConsumerGroup(fGroup).setConsumerId(fId)
176                         .setTimeoutMs(fTimeoutMs).setLimit(fLimit).setFilter(fFilter)
177                         .setApiKey_username(fApiKey).setApiSecret_password(fApiSecret)
178                         .createMRConsumerImpl();
179             } catch (MalformedURLException e) {
180                 throw new IllegalArgumentException(e);
181             }
182         }
183
184         private Collection<String> fHosts = null;
185         private String fTopic = null;
186         private String fGroup = null;
187         private String fId = null;
188         private String fApiKey = null;
189         private String fApiSecret = null;
190         private int fTimeoutMs = -1;
191         private int fLimit = -1;
192         private String fFilter = null;
193     }
194
195     //*************************************************************************/
196     //*************************************************************************/
197     //*************************************************************************/
198
199     /**
200      * A publisher builder.
201      *
202      * @author author
203      */
204     public static class PublisherBuilder {
205
206         /**
207          * Set the MR/UEB host(s) to use.
208          *
209          * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
210          * @return this builder
211          */
212         public PublisherBuilder usingHosts(String hostlist) {
213             return usingHosts(MRConsumerImpl.stringToList(hostlist));
214         }
215
216         /**
217          * Set the MR/UEB host(s) to use.
218          *
219          * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
220          * @return this builder
221          */
222         public PublisherBuilder usingHosts(String[] hostSet) {
223             final TreeSet<String> hosts = new TreeSet<>();
224             Collections.addAll(hosts, hostSet);
225             return usingHosts(hosts);
226         }
227
228         /**
229          * Set the MR/UEB host(s) to use.
230          *
231          * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
232          * @return this builder
233          */
234         public PublisherBuilder usingHosts(Collection<String> hostlist) {
235             fHosts = hostlist;
236             return this;
237         }
238
239         /**
240          * Set the topic to publish on.
241          *
242          * @param topic The topic on which to publish messages.
243          * @return this builder
244          */
245         public PublisherBuilder onTopic(String topic) {
246             fTopic = topic;
247             return this;
248         }
249
250         /**
251          * Batch message sends with the given limits.
252          *
253          * @param messageCount The largest set of messages to batch.
254          * @param ageInMs      The maximum age of a message waiting in a batch.
255          * @return this builder
256          */
257         public PublisherBuilder limitBatch(int messageCount, int ageInMs) {
258             fMaxBatchSize = messageCount;
259             fMaxBatchAgeMs = ageInMs;
260             return this;
261         }
262
263         /**
264          * Compress transactions.
265          *
266          * @return this builder
267          */
268         public PublisherBuilder withCompresion() {
269             return enableCompresion(true);
270         }
271
272         /**
273          * Do not compress transactions.
274          *
275          * @return this builder
276          */
277         public PublisherBuilder withoutCompresion() {
278             return enableCompresion(false);
279         }
280
281         /**
282          * Set the compression option.
283          *
284          * @param compress true to gzip compress transactions
285          * @return this builder
286          */
287         public PublisherBuilder enableCompresion(boolean compress) {
288             fCompress = compress;
289             return this;
290         }
291
292         /**
293          * Set the API key and secret for this client.
294          *
295          * @param apiKey
296          * @param apiSecret
297          * @return this builder
298          */
299         public PublisherBuilder authenticatedBy(String apiKey, String apiSecret) {
300             fApiKey = apiKey;
301             fApiSecret = apiSecret;
302             return this;
303         }
304
305         /**
306          * Build the publisher.
307          *
308          * @return a batching publisher
309          */
310         public MRBatchingPublisher build() {
311             if (fHosts == null || fHosts.isEmpty() || fTopic == null) {
312                 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
313             }
314
315             if (sfPublisherMock != null) {
316                 return sfPublisherMock;
317             }
318
319             final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
320                     .againstUrls(fHosts)
321                     .onTopic(fTopic)
322                     .batchTo(fMaxBatchSize, fMaxBatchAgeMs)
323                     .compress(fCompress)
324                     .build();
325             if (fApiKey != null) {
326                 pub.setApiCredentials(fApiKey, fApiSecret);
327             }
328             return pub;
329         }
330
331         private Collection<String> fHosts = null;
332         private String fTopic = null;
333         private int fMaxBatchSize = 1;
334         private int fMaxBatchAgeMs = 1;
335         private boolean fCompress = false;
336         private String fApiKey = null;
337         private String fApiSecret = null;
338     }
339
340     /**
341      * A builder for an identity manager.
342      *
343      * @author author
344      */
345     public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager> {
346
347         @Override
348         protected MRIdentityManager constructClient(Collection<String> hosts) {
349             try {
350                 return new MRMetaClient(hosts);
351             } catch (MalformedURLException e) {
352                 throw new IllegalArgumentException(e);
353             }
354         }
355     }
356
357     /**
358      * A builder for a topic manager.
359      *
360      * @author author
361      */
362     public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager> {
363
364         @Override
365         protected MRTopicManager constructClient(Collection<String> hosts) {
366             try {
367                 return new MRMetaClient(hosts);
368             } catch (MalformedURLException e) {
369                 throw new IllegalArgumentException(e);
370             }
371         }
372     }
373
374     /**
375      * Inject a consumer. Used to support unit tests.
376      *
377      * @param cc
378      */
379     public static void $testInject(MRConsumer cc) {
380         sfConsumerMock = cc;
381     }
382
383     /**
384      * Inject a publisher. Used to support unit tests.
385      *
386      * @param pub
387      */
388     public static void $testInject(MRBatchingPublisher pub) {
389         sfPublisherMock = pub;
390     }
391
392     static MRConsumer sfConsumerMock = null;
393     static MRBatchingPublisher sfPublisherMock = null;
394
395     /**
396      * A builder for an identity manager.
397      *
398      * @author author
399      */
400     public abstract static class AbstractAuthenticatedManagerBuilder<T extends MRClient> {
401
402         /**
403          * Set the host list.
404          *
405          * @param hostList a comma-separated list of hosts to use to connect to MR
406          * @return this builder
407          */
408         public AbstractAuthenticatedManagerBuilder<T> usingHosts(String hostList) {
409             return usingHosts(MRConsumerImpl.stringToList(hostList));
410         }
411
412         /**
413          * Set the host list.
414          *
415          * @param hostSet a set of hosts to use to connect to MR
416          * @return this builder
417          */
418         public AbstractAuthenticatedManagerBuilder<T> usingHosts(Collection<String> hostSet) {
419             fHosts = hostSet;
420             return this;
421         }
422
423         /**
424          * Set the API key and secret for this client.
425          *
426          * @param apiKey
427          * @param apiSecret
428          * @return this builder
429          */
430         public AbstractAuthenticatedManagerBuilder<T> authenticatedBy(String apiKey, String apiSecret) {
431             fApiKey = apiKey;
432             fApiSecret = apiSecret;
433             return this;
434         }
435
436         /**
437          * Build the consumer.
438          *
439          * @return a consumer
440          */
441         public T build() {
442             if (fHosts.isEmpty()) {
443                 throw new IllegalArgumentException(ILLEGAL_ARGUMENT_MESSAGE);
444             }
445
446             final T mgr = constructClient(fHosts);
447             mgr.setApiCredentials(fApiKey, fApiSecret);
448             return mgr;
449         }
450
451         protected abstract T constructClient(Collection<String> hosts);
452
453         private Collection<String> fHosts = null;
454         private String fApiKey = null;
455         private String fApiSecret = null;
456     }
457
458 }