76df039ebb703150b9719c508af2c8baadd61f7c
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / MRClientBuilders.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client;
23
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.att.nsa.mr.client.impl.MRConsumerImpl;
33 import com.att.nsa.mr.client.impl.MRMetaClient;
34 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
35
36 /**
37  * A collection of builders for various types of MR API clients
38  * 
39  * @author author
40  */
41 public class MRClientBuilders
42 {
43         /**
44          * A builder for a topic Consumer
45          * @author author
46          */
47         public static class ConsumerBuilder
48         {
49                 /**
50                  * Construct a consumer builder.
51                  */
52                 public ConsumerBuilder () {}
53
54                 /**
55                  * Set the host list
56                  * @param hostList a comma-separated list of hosts to use to connect to MR
57                  * @return this builder
58                  */
59                 public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
60
61                 /**
62                  * Set the host list
63                  * @param hostSet a set of hosts to use to connect to MR
64                  * @return this builder
65                  */
66                 public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
67
68                 /**
69                  * Set the topic
70                  * @param topic the name of the topic to consume
71                  * @return this builder
72                  */
73                 public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; }
74
75                 /**
76                  * Set the consumer's group and ID
77                  * @param consumerGroup The name of the consumer group this consumer is part of
78                  * @param consumerId The unique id of this consumer in its group
79                  * @return this builder
80                  */
81                 public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; }
82
83                 /**
84                  * Set the API key and secret for this client.
85                  * @param apiKey
86                  * @param apiSecret
87                  * @return this builder
88                  */
89                 public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
90
91                 /**
92                  * Set the server side timeout
93                  * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
94                  * @return this builder
95                  */
96                 public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; };
97
98                 /**
99                  * Set the maximum number of messages to receive per transaction
100                  * @param limit The maximum number of messages to receive from the server in one transaction.
101                  * @return this builder
102                  */
103                 public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; };
104
105                 /**
106                  * Set a filter to use on the server
107                  * @param filter a Highland Park standard library filter encoded in JSON
108                  * @return this builder
109                  */
110                 public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; }
111
112                 /**
113                  * Build the consumer
114                  * @return a consumer
115                  */
116                 public MRConsumer build ()
117                 {
118                         if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
119                         {
120                                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
121                         }
122
123                         if ( fGroup == null )
124                         {
125                                 fGroup = UUID.randomUUID ().toString ();
126                                 fId = "0";
127                                 log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
128                         }
129
130                         if ( sfConsumerMock != null ) return sfConsumerMock;
131                         try {
132                                 return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
133                         } catch (MalformedURLException e) {
134                                 throw new IllegalArgumentException(e);
135                         }
136                 }
137
138                 private Collection<String> fHosts = null;
139                 private String fTopic = null;
140                 private String fGroup = null;
141                 private String fId = null;
142                 private String fApiKey = null;
143                 private String fApiSecret = null;
144                 private int fTimeoutMs = -1;
145                 private int fLimit = -1;
146                 private String fFilter = null;
147         }
148
149         /*************************************************************************/
150         /*************************************************************************/
151         /*************************************************************************/
152
153         /**
154          * A publisher builder
155          * @author author
156          */
157         public static class PublisherBuilder
158         {
159                 public PublisherBuilder () {}
160
161                 /**
162                  * Set the MR/UEB host(s) to use
163                  * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
164                  * @return this builder
165                  */
166                 public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); }
167
168                 /**
169                  * Set the MR/UEB host(s) to use
170                  * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
171                  * @return this builder
172                  */
173                 public PublisherBuilder usingHosts ( String[] hostSet )
174                 {
175                         final TreeSet<String> hosts = new TreeSet<String> ();
176                         for ( String hp : hostSet )
177                         {
178                                 hosts.add ( hp );
179                         }
180                         return usingHosts ( hosts );
181                 }
182
183                 /**
184                  * Set the MR/UEB host(s) to use
185                  * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
186                  * @return this builder
187                  */
188                 public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; }
189
190                 /**
191                  * Set the topic to publish on
192                  * @param topic The topic on which to publish messages.
193                  * @return this builder
194                  */
195                 public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; }
196
197                 /**
198                  * Batch message sends with the given limits.
199                  * @param messageCount The largest set of messages to batch.
200                  * @param ageInMs The maximum age of a message waiting in a batch.
201                  * @return this builder
202                  */
203                 public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; }
204
205                 /**
206                  * Compress transactions
207                  * @return this builder
208                  */
209                 public PublisherBuilder withCompresion () { return enableCompresion(true); }
210
211                 /**
212                  * Do not compress transactions
213                  * @return this builder
214                  */
215                 public PublisherBuilder withoutCompresion () { return enableCompresion(false); }
216
217                 /**
218                  * Set the compression option
219                  * @param compress true to gzip compress transactions
220                  * @return this builder
221                  */
222                 public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; }
223
224                 /**
225                  * Set the API key and secret for this client.
226                  * @param apiKey
227                  * @param apiSecret
228                  * @return this builder
229                  */
230                 public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
231
232                 /**
233                  * Build the publisher
234                  * @return a batching publisher
235                  */
236                 public MRBatchingPublisher build ()
237                 {
238                         if ( fHosts == null || fHosts.isEmpty() || fTopic == null )
239                         {
240                                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
241                         }
242
243                         if ( sfPublisherMock != null ) return sfPublisherMock;
244
245                         final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
246                                 againstUrls ( fHosts ).
247                                 onTopic ( fTopic ).
248                                 batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
249                                 compress ( fCompress ).
250                                 build ();
251                         if ( fApiKey != null )
252                         {
253                                 pub.setApiCredentials ( fApiKey, fApiSecret );
254                         }
255                         return pub;
256                 }
257                 
258                 private Collection<String> fHosts = null;
259                 private String fTopic = null;
260                 private int fMaxBatchSize = 1;
261                 private int fMaxBatchAgeMs = 1;
262                 private boolean fCompress = false;
263                 private String fApiKey = null;
264                 private String fApiSecret = null;
265         }
266
267         /**
268          * A builder for an identity manager
269          * @author author
270          */
271         public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
272         {
273                 /**
274                  * Construct an identity manager builder.
275                  */
276                 public IdentityManagerBuilder () {}
277
278                 @Override
279                 protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
280                         return new MRMetaClient ( hosts );
281                 } catch (MalformedURLException e) {
282                         throw new IllegalArgumentException(e);
283                 } }
284         }
285
286         /**
287          * A builder for a topic manager
288          * @author author
289          */
290         public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
291         {
292                 /**
293                  * Construct an topic manager builder.
294                  */
295                 public TopicManagerBuilder () {}
296
297                 @Override
298                 protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
299                         return new MRMetaClient ( hosts );
300                 } catch (MalformedURLException e) {
301                         throw new IllegalArgumentException(e);
302                 } }
303         }
304
305         /**
306          * Inject a consumer. Used to support unit tests.
307          * @param cc
308          */
309         public static void $testInject ( MRConsumer cc )
310         {
311                 sfConsumerMock = cc;
312         }
313
314         /**
315          * Inject a publisher. Used to support unit tests.
316          * @param pub
317          */
318         public static void $testInject ( MRBatchingPublisher pub )
319         {
320                 sfPublisherMock = pub;
321         }
322
323         static MRConsumer sfConsumerMock = null;
324         static MRBatchingPublisher sfPublisherMock = null;
325
326         /**
327          * A builder for an identity manager
328          * @author author
329          */
330         public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
331         {
332                 /**
333                  * Construct an identity manager builder.
334                  */
335                 public AbstractAuthenticatedManagerBuilder () {}
336
337                 /**
338                  * Set the host list
339                  * @param hostList a comma-separated list of hosts to use to connect to MR
340                  * @return this builder
341                  */
342                 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
343
344                 /**
345                  * Set the host list
346                  * @param hostSet a set of hosts to use to connect to MR
347                  * @return this builder
348                  */
349                 public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
350
351                 /**
352                  * Set the API key and secret for this client.
353                  * @param apiKey
354                  * @param apiSecret
355                  * @return this builder
356                  */
357                 public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
358
359                 /**
360                  * Build the consumer
361                  * @return a consumer
362                  */
363                 public T build ()
364                 {
365                          if ( fHosts.isEmpty() ) 
366                         {
367                                 throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
368                         }
369
370                         final T mgr = constructClient ( fHosts );
371                         mgr.setApiCredentials ( fApiKey, fApiSecret );
372                         return mgr;
373                 }
374
375                 protected abstract T constructClient ( Collection<String> hosts );
376
377                 private Collection<String> fHosts = null;
378                 private String fApiKey = null;
379                 private String fApiSecret = null;
380         }
381
382         private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
383 }