Merge "sonar critical for Conditional Statement"
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / metrics / publisher / DMaaPCambriaClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.metrics.publisher;
23
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
28
29 import com.att.nsa.cambria.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
30 import com.att.nsa.cambria.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
31
32 /**
33  * A factory for Cambria clients.<br/>
34  * <br/>
35  * Use caution selecting a consumer creator factory. If the call doesn't accept
36  * a consumer group name, then it creates a consumer that is not restartable.
37  * That is, if you stop your process and start it again, your client will NOT
38  * receive any missed messages on the topic. If you need to ensure receipt of
39  * missed messages, then you must use a consumer that's created with a group
40  * name and ID. (If you create multiple consumer processes using the same group,
41  * load is split across them. Be sure to use a different ID for each instance.)<br/>
42  * <br/>
43  * Publishers
44  * 
45  * @author author
46  */
47 public class DMaaPCambriaClientFactory {
48         /**
49          * Create a consumer instance with the default timeout and no limit on
50          * messages returned. This consumer operates as an independent consumer
51          * (i.e., not in a group) and is NOT re-startable across sessions.
52          * 
53          * @param hostList
54          *            A comma separated list of hosts to use to connect to Cambria.
55          *            You can include port numbers (3904 is the default). 
56          * 
57          * @param topic
58          *            The topic to consume
59          * 
60          * @return a consumer
61          */
62         public static CambriaConsumer createConsumer(String hostList, String topic) {
63                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
64                                 topic);
65         }
66
67         /**
68          * Create a consumer instance with the default timeout and no limit on
69          * messages returned. This consumer operates as an independent consumer
70          * (i.e., not in a group) and is NOT re-startable across sessions.
71          * 
72          * @param hostSet
73          *            The host used in the URL to Cambria. Entries can be
74          *            "host:port".
75          * @param topic
76          *            The topic to consume
77          * 
78          * @return a consumer
79          */
80         public static CambriaConsumer createConsumer(Collection<String> hostSet,
81                         String topic) {
82                 return createConsumer(hostSet, topic, null);
83         }
84
85         /**
86          * Create a consumer instance with server-side filtering, the default
87          * timeout, and no limit on messages returned. This consumer operates as an
88          * independent consumer (i.e., not in a group) and is NOT re-startable
89          * across sessions.
90          * 
91          * @param hostSet
92          *            The host used in the URL to Cambria. Entries can be
93          *            "host:port".
94          * @param topic
95          *            The topic to consume
96          * @param filter
97          *            a filter to use on the server side
98          * 
99          * @return a consumer
100          */
101         public static CambriaConsumer createConsumer(Collection<String> hostSet,
102                         String topic, String filter) {
103                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
104                                 "0", -1, -1, filter, null, null);
105         }
106
107         /**
108          * Create a consumer instance with the default timeout, and no limit on
109          * messages returned. This consumer can operate in a logical group and is
110          * re-startable across sessions when you use the same group and ID on
111          * restart.
112          * 
113          * @param hostSet
114          *            The host used in the URL to Cambria. Entries can be
115          *            "host:port".
116          * @param topic
117          *            The topic to consume
118          * @param consumerGroup
119          *            The name of the consumer group this consumer is part of
120          * @param consumerId
121          *            The unique id of this consume in its group
122          * 
123          * @return a consumer
124          */
125         public static CambriaConsumer createConsumer(Collection<String> hostSet,
126                         final String topic, final String consumerGroup,
127                         final String consumerId) {
128                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
129         }
130
131         /**
132          * Create a consumer instance with the default timeout, and no limit on
133          * messages returned. This consumer can operate in a logical group and is
134          * re-startable across sessions when you use the same group and ID on
135          * restart.
136          * 
137          * @param hostSet
138          *            The host used in the URL to Cambria. Entries can be
139          *            "host:port".
140          * @param topic
141          *            The topic to consume
142          * @param consumerGroup
143          *            The name of the consumer group this consumer is part of
144          * @param consumerId
145          *            The unique id of this consume in its group
146          * @param timeoutMs
147          *            The amount of time in milliseconds that the server should keep
148          *            the connection open while waiting for message traffic. Use -1
149          *            for default timeout.
150          * @param limit
151          *            A limit on the number of messages returned in a single call.
152          *            Use -1 for no limit.
153          * 
154          * @return a consumer
155          */
156         public static CambriaConsumer createConsumer(Collection<String> hostSet,
157                         final String topic, final String consumerGroup,
158                         final String consumerId, int timeoutMs, int limit) {
159                 return createConsumer(hostSet, topic, consumerGroup, consumerId,
160                                 timeoutMs, limit, null, null, null);
161         }
162
163         /**
164          * Create a consumer instance with the default timeout, and no limit on
165          * messages returned. This consumer can operate in a logical group and is
166          * re-startable across sessions when you use the same group and ID on
167          * restart. This consumer also uses server-side filtering.
168          * 
169          * @param hostList
170          *            A comma separated list of hosts to use to connect to Cambria.
171          *            You can include port numbers (3904 is the default). 
172          * @param topic
173          *            The topic to consume
174          * @param consumerGroup
175          *            The name of the consumer group this consumer is part of
176          * @param consumerId
177          *            The unique id of this consume in its group
178          * @param timeoutMs
179          *            The amount of time in milliseconds that the server should keep
180          *            the connection open while waiting for message traffic. Use -1
181          *            for default timeout.
182          * @param limit
183          *            A limit on the number of messages returned in a single call.
184          *            Use -1 for no limit.
185          * @param filter
186          *            A Highland Park filter expression using only built-in filter
187          *            components. Use null for "no filter".
188          * @param apiKey
189          *            key associated with a user
190          * @param apiSecret
191          *            of a user
192          * 
193          * @return a consumer
194          */
195         public static CambriaConsumer createConsumer(String hostList,
196                         final String topic, final String consumerGroup,
197                         final String consumerId, int timeoutMs, int limit, String filter,
198                         String apiKey, String apiSecret) {
199                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
200                                 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
201                                 apiKey, apiSecret);
202         }
203
204         /**
205          * Create a consumer instance with the default timeout, and no limit on
206          * messages returned. This consumer can operate in a logical group and is
207          * re-startable across sessions when you use the same group and ID on
208          * restart. This consumer also uses server-side filtering.
209          * 
210          * @param hostSet
211          *            The host used in the URL to Cambria. Entries can be
212          *            "host:port".
213          * @param topic
214          *            The topic to consume
215          * @param consumerGroup
216          *            The name of the consumer group this consumer is part of
217          * @param consumerId
218          *            The unique id of this consume in its group
219          * @param timeoutMs
220          *            The amount of time in milliseconds that the server should keep
221          *            the connection open while waiting for message traffic. Use -1
222          *            for default timeout.
223          * @param limit
224          *            A limit on the number of messages returned in a single call.
225          *            Use -1 for no limit.
226          * @param filter
227          *            A Highland Park filter expression using only built-in filter
228          *            components. Use null for "no filter".
229          * @param apiKey
230          *            key associated with a user
231          * @param apiSecret
232          *            of a user
233          * @return a consumer
234          */
235         public static CambriaConsumer createConsumer(Collection<String> hostSet,
236                         final String topic, final String consumerGroup,
237                         final String consumerId, int timeoutMs, int limit, String filter,
238                         String apiKey, String apiSecret) {
239                 if (sfMock != null)
240                         return sfMock;
241                 try {
242                         return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
243                                         consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
244                 } catch (MalformedURLException e) {
245                         throw new RuntimeException(e);
246                 }
247         }
248
249         /*************************************************************************/
250         /*************************************************************************/
251         /*************************************************************************/
252
253         /**
254          * Create a publisher that sends each message (or group of messages)
255          * immediately. Most applications should favor higher latency for much
256          * higher message throughput and the "simple publisher" is not a good
257          * choice.
258          * 
259          * @param hostlist
260          *            The host used in the URL to Cambria. Can be "host:port", can
261          *            be multiple comma-separated entries.
262          * @param topic
263          *            The topic on which to publish messages.
264          * @return a publisher
265          */
266         public static CambriaBatchingPublisher createSimplePublisher(
267                         String hostlist, String topic) {
268                 return createBatchingPublisher(hostlist, topic, 1, 1);
269         }
270
271         /**
272          * Create a publisher that batches messages. Be sure to close the publisher
273          * to send the last batch and ensure a clean shutdown. Message payloads are
274          * not compressed.
275          * 
276          * @param hostlist
277          *            The host used in the URL to Cambria. Can be "host:port", can
278          *            be multiple comma-separated entries.
279          * @param topic
280          *            The topic on which to publish messages.
281          * @param maxBatchSize
282          *            The largest set of messages to batch
283          * @param maxAgeMs
284          *            The maximum age of a message waiting in a batch
285          * 
286          * @return a publisher
287          */
288         public static CambriaBatchingPublisher createBatchingPublisher(
289                         String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
290                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
291                                 false);
292         }
293
294         /**
295          * Create a publisher that batches messages. Be sure to close the publisher
296          * to send the last batch and ensure a clean shutdown.
297          * 
298          * @param hostlist
299          *            The host used in the URL to Cambria. Can be "host:port", can
300          *            be multiple comma-separated entries.
301          * @param topic
302          *            The topic on which to publish messages.
303          * @param maxBatchSize
304          *            The largest set of messages to batch
305          * @param maxAgeMs
306          *            The maximum age of a message waiting in a batch
307          * @param compress
308          *            use gzip compression
309          * 
310          * @return a publisher
311          */
312         public static CambriaBatchingPublisher createBatchingPublisher(
313                         String hostlist, String topic, int maxBatchSize, long maxAgeMs,
314                         boolean compress) {
315                 return createBatchingPublisher(
316                                 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
317                                 maxBatchSize, maxAgeMs, compress);
318         }
319
320         /**
321          * Create a publisher that batches messages. Be sure to close the publisher
322          * to send the last batch and ensure a clean shutdown.
323          * 
324          * @param hostSet
325          *            A set of hosts to be used in the URL to Cambria. Can be
326          *            "host:port". Use multiple entries to enable failover.
327          * @param topic
328          *            The topic on which to publish messages.
329          * @param maxBatchSize
330          *            The largest set of messages to batch
331          * @param maxAgeMs
332          *            The maximum age of a message waiting in a batch
333          * @param compress
334          *            use gzip compression
335          * 
336          * @return a publisher
337          */
338         public static CambriaBatchingPublisher createBatchingPublisher(
339                         String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
340                         boolean compress) {
341                 final TreeSet<String> hosts = new TreeSet<String>();
342                 for (String hp : hostSet) {
343                         hosts.add(hp);
344                 }
345                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
346                                 compress);
347         }
348
349         /**
350          * Create a publisher that batches messages. Be sure to close the publisher
351          * to send the last batch and ensure a clean shutdown.
352          * 
353          * @param hostSet
354          *            A set of hosts to be used in the URL to Cambria. Can be
355          *            "host:port". Use multiple entries to enable failover.
356          * @param topic
357          *            The topic on which to publish messages.
358          * @param maxBatchSize
359          *            The largest set of messages to batch
360          * @param maxAgeMs
361          *            The maximum age of a message waiting in a batch
362          * @param compress
363          *            use gzip compression
364          * 
365          * @return a publisher
366          */
367         public static CambriaBatchingPublisher createBatchingPublisher(
368                         Collection<String> hostSet, String topic, int maxBatchSize,
369                         long maxAgeMs, boolean compress) {
370                 return new DMaaPCambriaSimplerBatchPublisher.Builder()
371                                 .againstUrls(hostSet).onTopic(topic)
372                                 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
373         }
374
375         /**
376          * Create an identity manager client to work with API keys.
377          * 
378          * @param hostSet
379          *            A set of hosts to be used in the URL to Cambria. Can be
380          *            "host:port". Use multiple entries to enable failover.
381          * @param apiKey
382          *            Your API key
383          * @param apiSecret
384          *            Your API secret
385          * @return an identity manager
386          */
387         /*
388          * public static CambriaIdentityManager createIdentityManager (
389          * Collection<String> hostSet, String apiKey, String apiSecret ) { final
390          * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
391          * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
392          */
393
394         /**
395          * Create a topic manager for working with topics.
396          * 
397          * @param hostSet
398          *            A set of hosts to be used in the URL to Cambria. Can be
399          *            "host:port". Use multiple entries to enable failover.
400          * @param apiKey
401          *            Your API key
402          * @param apiSecret
403          *            Your API secret
404          * @return a topic manager
405          */
406         /*
407          * public static CambriaTopicManager createTopicManager ( Collection<String>
408          * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
409          * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
410          * apiSecret ); return tmi; }
411          */
412
413         /**
414          * Inject a consumer. Used to support unit tests.
415          * 
416          * @param cc
417          */
418         public static void $testInject(CambriaConsumer cc) {
419                 sfMock = cc;
420         }
421
422         private static CambriaConsumer sfMock = null;
423 }