60e0666fb45f362565a6cff349357a30ae6cf458
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / MRClientBuilders.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *   Modifications Copyright © 2018 IBM.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *  
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *  
23  *******************************************************************************/
24 package org.onap.dmaap.mr.client;
25
26 import java.net.MalformedURLException;
27 import java.util.Collection;
28 import java.util.TreeSet;
29 import java.util.UUID;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
35 import org.onap.dmaap.mr.client.impl.MRMetaClient;
36 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
37
38 /**
39  * A collection of builders for various types of MR API clients
40  * 
41  * @author author
42  */
43 public class MRClientBuilders
44 {
45
46     /**
47      * Instantiates MRClientBuilders.
48      */
49     private MRClientBuilders() {
50         // prevent instantiation
51     }
52     
53     /**
54      * A builder for a topic Consumer
55      * @author author
56      */
57     public static class ConsumerBuilder
58     {
59         /**
60          * Construct a consumer builder.
61          */
62         public ConsumerBuilder () {}
63
64         /**
65          * Set the host list
66          * @param hostList a comma-separated list of hosts to use to connect to MR
67          * @return this builder
68          */
69         public ConsumerBuilder usingHosts ( String hostList ) { 
70             return usingHosts ( MRConsumerImpl.stringToList(hostList) ); 
71         }
72
73         /**
74          * Set the host list
75          * @param hostSet a set of hosts to use to connect to MR
76          * @return this builder
77          */
78         public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { 
79             fHosts = hostSet; return this; 
80         }
81
82         /**
83          * Set the topic
84          * @param topic the name of the topic to consume
85          * @return this builder
86          */
87         public ConsumerBuilder onTopic ( String topic ) { 
88             fTopic=topic; 
89             return this; 
90         }
91
92         /**
93          * Set the consumer's group and ID
94          * @param consumerGroup The name of the consumer group this consumer is part of
95          * @param consumerId The unique id of this consumer in its group
96          * @return this builder
97          */
98         public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { 
99             fGroup = consumerGroup; 
100             fId = consumerId; 
101             return this; 
102         }
103
104         /**
105          * Set the API key and secret for this client.
106          * @param apiKey
107          * @param apiSecret
108          * @return this builder
109          */
110         public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { 
111             fApiKey = apiKey; 
112             fApiSecret = apiSecret; 
113             return this; 
114         }
115
116         /**
117          * Set the server side timeout
118          * @param timeoutMs    The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
119          * @return this builder
120          */
121         public ConsumerBuilder waitAtServer ( int timeoutMs ) { 
122             fTimeoutMs = timeoutMs; 
123             return this; 
124         };
125
126         /**
127          * Set the maximum number of messages to receive per transaction
128          * @param limit The maximum number of messages to receive from the server in one transaction.
129          * @return this builder
130          */
131         public ConsumerBuilder receivingAtMost ( int limit ) { 
132             fLimit = limit; 
133             return this; 
134         };
135
136         /**
137          * Set a filter to use on the server
138          * @param filter a Highland Park standard library filter encoded in JSON
139          * @return this builder
140          */
141         public ConsumerBuilder withServerSideFilter ( String filter ) { 
142             fFilter = filter; 
143             return this; 
144         }
145
146         /**
147          * Build the consumer
148          * @return a consumer
149          */
150         public MRConsumer build ()
151         {
152             if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
153             {
154                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
155             }
156
157             if ( fGroup == null )
158             {
159                 fGroup = UUID.randomUUID ().toString ();
160                 fId = "0";
161                 log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
162             }
163
164             if ( sfConsumerMock != null ) return sfConsumerMock;
165             try {
166                 return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
167             } catch (MalformedURLException e) {
168                 throw new IllegalArgumentException(e);
169             }
170         }
171
172         private Collection<String> fHosts = null;
173         private String fTopic = null;
174         private String fGroup = null;
175         private String fId = null;
176         private String fApiKey = null;
177         private String fApiSecret = null;
178         private int fTimeoutMs = -1;
179         private int fLimit = -1;
180         private String fFilter = null;
181     }
182
183     /*************************************************************************/
184     /*************************************************************************/
185     /*************************************************************************/
186
187     /**
188      * A publisher builder
189      * @author author
190      */
191     public static class PublisherBuilder
192     {
193         public PublisherBuilder () {}
194
195         /**
196          * Set the MR/UEB host(s) to use
197          * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
198          * @return this builder
199          */
200         public PublisherBuilder usingHosts ( String hostlist ) { 
201             return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); 
202         }
203
204         /**
205          * Set the MR/UEB host(s) to use
206          * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
207          * @return this builder
208          */
209         public PublisherBuilder usingHosts ( String[] hostSet )
210         {
211             final TreeSet<String> hosts = new TreeSet<String> ();
212             for ( String hp : hostSet )
213             {
214                 hosts.add ( hp );
215             }
216             return usingHosts ( hosts );
217         }
218
219         /**
220          * Set the MR/UEB host(s) to use
221          * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
222          * @return this builder
223          */
224         public PublisherBuilder usingHosts ( Collection<String> hostlist ) { 
225             fHosts=hostlist; 
226             return this; 
227         }
228
229         /**
230          * Set the topic to publish on
231          * @param topic The topic on which to publish messages.
232          * @return this builder
233          */
234         public PublisherBuilder onTopic ( String topic ) { 
235             fTopic = topic; 
236             return this; 
237         }
238
239         /**
240          * Batch message sends with the given limits.
241          * @param messageCount The largest set of messages to batch.
242          * @param ageInMs The maximum age of a message waiting in a batch.
243          * @return this builder
244          */
245         public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { 
246             fMaxBatchSize = messageCount; 
247             fMaxBatchAgeMs = ageInMs; 
248             return this; 
249         }
250
251         /**
252          * Compress transactions
253          * @return this builder
254          */
255         public PublisherBuilder withCompresion () { 
256             return enableCompresion(true); 
257         }
258
259         /**
260          * Do not compress transactions
261          * @return this builder
262          */
263         public PublisherBuilder withoutCompresion () { 
264             return enableCompresion(false); 
265         }
266
267         /**
268          * Set the compression option
269          * @param compress true to gzip compress transactions
270          * @return this builder
271          */
272         public PublisherBuilder enableCompresion ( boolean compress ) { 
273             fCompress = compress; 
274             return this; 
275         }
276
277         /**
278          * Set the API key and secret for this client.
279          * @param apiKey
280          * @param apiSecret
281          * @return this builder
282          */
283         public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { 
284             fApiKey = apiKey; 
285             fApiSecret = apiSecret; 
286             return this; 
287         }
288
289         /**
290          * Build the publisher
291          * @return a batching publisher
292          */
293         public MRBatchingPublisher build ()
294         {
295             if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
296             {
297                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
298             }
299
300             if ( sfPublisherMock != null ) return sfPublisherMock;
301
302             final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
303                 againstUrls ( fHosts ).
304                 onTopic ( fTopic ).
305                 batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
306                 compress ( fCompress ).
307                 build ();
308             if ( fApiKey != null )
309             {
310                 pub.setApiCredentials ( fApiKey, fApiSecret );
311             }
312             return pub;
313         }
314         
315         private Collection<String> fHosts = null;
316         private String fTopic = null;
317         private int fMaxBatchSize = 1;
318         private int fMaxBatchAgeMs = 1;
319         private boolean fCompress = false;
320         private String fApiKey = null;
321         private String fApiSecret = null;
322     }
323
324     /**
325      * A builder for an identity manager
326      * @author author
327      */
328     public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
329     {
330         /**
331          * Construct an identity manager builder.
332          */
333         public IdentityManagerBuilder () {}
334
335         @Override
336         protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
337             return new MRMetaClient ( hosts );
338         } catch (MalformedURLException e) {
339             throw new IllegalArgumentException(e);
340         } }
341     }
342
343     /**
344      * A builder for a topic manager
345      * @author author
346      */
347     public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
348     {
349         /**
350          * Construct an topic manager builder.
351          */
352         public TopicManagerBuilder () {}
353
354         @Override
355         protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
356             return new MRMetaClient ( hosts );
357         } catch (MalformedURLException e) {
358             throw new IllegalArgumentException(e);
359         } }
360     }
361
362     /**
363      * Inject a consumer. Used to support unit tests.
364      * @param cc
365      */
366     public static void $testInject ( MRConsumer cc )
367     {
368         sfConsumerMock = cc;
369     }
370
371     /**
372      * Inject a publisher. Used to support unit tests.
373      * @param pub
374      */
375     public static void $testInject ( MRBatchingPublisher pub )
376     {
377         sfPublisherMock = pub;
378     }
379
380     static MRConsumer sfConsumerMock = null;
381     static MRBatchingPublisher sfPublisherMock = null;
382
383     /**
384      * A builder for an identity manager
385      * @author author
386      */
387     public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
388     {
389         /**
390          * Construct an identity manager builder.
391          */
392         public AbstractAuthenticatedManagerBuilder () {}
393
394         /**
395          * Set the host list
396          * @param hostList a comma-separated list of hosts to use to connect to MR
397          * @return this builder
398          */
399         public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { 
400             return usingHosts ( MRConsumerImpl.stringToList(hostList) ); 
401         }
402
403         /**
404          * Set the host list
405          * @param hostSet a set of hosts to use to connect to MR
406          * @return this builder
407          */
408         public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { 
409             fHosts = hostSet; 
410             return this; 
411         }
412
413         /**
414          * Set the API key and secret for this client.
415          * @param apiKey
416          * @param apiSecret
417          * @return this builder
418          */
419         public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { 
420             fApiKey = apiKey; 
421             fApiSecret = apiSecret; 
422             return this; 
423         }
424
425         /**
426          * Build the consumer
427          * @return a consumer
428          */
429         public T build ()
430         {
431              if ( fHosts.isEmpty() ) 
432             {
433                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
434             }
435
436             final T mgr = constructClient ( fHosts );
437             mgr.setApiCredentials ( fApiKey, fApiSecret );
438             return mgr;
439         }
440
441         protected abstract T constructClient ( Collection<String> hosts );
442
443         private Collection<String> fHosts = null;
444         private String fApiKey = null;
445         private String fApiSecret = null;
446     }
447     
448     private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
449 }