9158c9695349e63e94a66b12cf4fb5ed5e444e82
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / metrics / publisher / DMaaPCambriaClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher;
23
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
28
29 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
30 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
31
32 /**
33  * A factory for Cambria clients.<br/>
34  * <br/>
35  * Use caution selecting a consumer creator factory. If the call doesn't accept
36  * a consumer group name, then it creates a consumer that is not restartable.
37  * That is, if you stop your process and start it again, your client will NOT
38  * receive any missed messages on the topic. If you need to ensure receipt of
39  * missed messages, then you must use a consumer that's created with a group
40  * name and ID. (If you create multiple consumer processes using the same group,
41  * load is split across them. Be sure to use a different ID for each instance.)<br/>
42  * <br/>
43  * Publishers
44  * 
45  * @author peter
46  */
47 public class DMaaPCambriaClientFactory {
48         /**
49          * Create a consumer instance with the default timeout and no limit on
50          * messages returned. This consumer operates as an independent consumer
51          * (i.e., not in a group) and is NOT re-startable across sessions.
52          * 
53          * @param hostList
54          *            A comma separated list of hosts to use to connect to Cambria.
55          *            You can include port numbers (3904 is the default). For
56          *            example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
57          * 
58          * @param topic
59          *            The topic to consume
60          * 
61          * @return a consumer
62          */
63         public static CambriaConsumer createConsumer(String hostList, String topic) {
64                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
65                                 topic);
66         }
67
68         /**
69          * Create a consumer instance with the default timeout and no limit on
70          * messages returned. This consumer operates as an independent consumer
71          * (i.e., not in a group) and is NOT re-startable across sessions.
72          * 
73          * @param hostSet
74          *            The host used in the URL to Cambria. Entries can be
75          *            "host:port".
76          * @param topic
77          *            The topic to consume
78          * 
79          * @return a consumer
80          */
81         public static CambriaConsumer createConsumer(Collection<String> hostSet,
82                         String topic) {
83                 return createConsumer(hostSet, topic, null);
84         }
85
86         /**
87          * Create a consumer instance with server-side filtering, the default
88          * timeout, and no limit on messages returned. This consumer operates as an
89          * independent consumer (i.e., not in a group) and is NOT re-startable
90          * across sessions.
91          * 
92          * @param hostSet
93          *            The host used in the URL to Cambria. Entries can be
94          *            "host:port".
95          * @param topic
96          *            The topic to consume
97          * @param filter
98          *            a filter to use on the server side
99          * 
100          * @return a consumer
101          */
102         public static CambriaConsumer createConsumer(Collection<String> hostSet,
103                         String topic, String filter) {
104                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
105                                 "0", -1, -1, filter, null, null);
106         }
107
108         /**
109          * Create a consumer instance with the default timeout, and no limit on
110          * messages returned. This consumer can operate in a logical group and is
111          * re-startable across sessions when you use the same group and ID on
112          * restart.
113          * 
114          * @param hostSet
115          *            The host used in the URL to Cambria. Entries can be
116          *            "host:port".
117          * @param topic
118          *            The topic to consume
119          * @param consumerGroup
120          *            The name of the consumer group this consumer is part of
121          * @param consumerId
122          *            The unique id of this consume in its group
123          * 
124          * @return a consumer
125          */
126         public static CambriaConsumer createConsumer(Collection<String> hostSet,
127                         final String topic, final String consumerGroup,
128                         final String consumerId) {
129                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
130         }
131
132         /**
133          * Create a consumer instance with the default timeout, and no limit on
134          * messages returned. This consumer can operate in a logical group and is
135          * re-startable across sessions when you use the same group and ID on
136          * restart.
137          * 
138          * @param hostSet
139          *            The host used in the URL to Cambria. Entries can be
140          *            "host:port".
141          * @param topic
142          *            The topic to consume
143          * @param consumerGroup
144          *            The name of the consumer group this consumer is part of
145          * @param consumerId
146          *            The unique id of this consume in its group
147          * @param timeoutMs
148          *            The amount of time in milliseconds that the server should keep
149          *            the connection open while waiting for message traffic. Use -1
150          *            for default timeout.
151          * @param limit
152          *            A limit on the number of messages returned in a single call.
153          *            Use -1 for no limit.
154          * 
155          * @return a consumer
156          */
157         public static CambriaConsumer createConsumer(Collection<String> hostSet,
158                         final String topic, final String consumerGroup,
159                         final String consumerId, int timeoutMs, int limit) {
160                 return createConsumer(hostSet, topic, consumerGroup, consumerId,
161                                 timeoutMs, limit, null, null, null);
162         }
163
164         /**
165          * Create a consumer instance with the default timeout, and no limit on
166          * messages returned. This consumer can operate in a logical group and is
167          * re-startable across sessions when you use the same group and ID on
168          * restart. This consumer also uses server-side filtering.
169          * 
170          * @param hostList
171          *            A comma separated list of hosts to use to connect to Cambria.
172          *            You can include port numbers (3904 is the default). For
173          *            example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
174          * @param topic
175          *            The topic to consume
176          * @param consumerGroup
177          *            The name of the consumer group this consumer is part of
178          * @param consumerId
179          *            The unique id of this consume in its group
180          * @param timeoutMs
181          *            The amount of time in milliseconds that the server should keep
182          *            the connection open while waiting for message traffic. Use -1
183          *            for default timeout.
184          * @param limit
185          *            A limit on the number of messages returned in a single call.
186          *            Use -1 for no limit.
187          * @param filter
188          *            A Highland Park filter expression using only built-in filter
189          *            components. Use null for "no filter".
190          * @param apiKey
191          *            key associated with a user
192          * @param apiSecret
193          *            of a user
194          * 
195          * @return a consumer
196          */
197         public static CambriaConsumer createConsumer(String hostList,
198                         final String topic, final String consumerGroup,
199                         final String consumerId, int timeoutMs, int limit, String filter,
200                         String apiKey, String apiSecret) {
201                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
202                                 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
203                                 apiKey, apiSecret);
204         }
205
206         /**
207          * Create a consumer instance with the default timeout, and no limit on
208          * messages returned. This consumer can operate in a logical group and is
209          * re-startable across sessions when you use the same group and ID on
210          * restart. This consumer also uses server-side filtering.
211          * 
212          * @param hostSet
213          *            The host used in the URL to Cambria. Entries can be
214          *            "host:port".
215          * @param topic
216          *            The topic to consume
217          * @param consumerGroup
218          *            The name of the consumer group this consumer is part of
219          * @param consumerId
220          *            The unique id of this consume in its group
221          * @param timeoutMs
222          *            The amount of time in milliseconds that the server should keep
223          *            the connection open while waiting for message traffic. Use -1
224          *            for default timeout.
225          * @param limit
226          *            A limit on the number of messages returned in a single call.
227          *            Use -1 for no limit.
228          * @param filter
229          *            A Highland Park filter expression using only built-in filter
230          *            components. Use null for "no filter".
231          * @param apiKey
232          *            key associated with a user
233          * @param apiSecret
234          *            of a user
235          * @return a consumer
236          */
237         public static CambriaConsumer createConsumer(Collection<String> hostSet,
238                         final String topic, final String consumerGroup,
239                         final String consumerId, int timeoutMs, int limit, String filter,
240                         String apiKey, String apiSecret) {
241                 if (sfMock != null)
242                         return sfMock;
243                 try {
244                 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
245                                 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
246         } catch (MalformedURLException e) {
247                 throw new RuntimeException(e);
248         }
249         }
250
251         /*************************************************************************/
252         /*************************************************************************/
253         /*************************************************************************/
254
255         /**
256          * Create a publisher that sends each message (or group of messages)
257          * immediately. Most applications should favor higher latency for much
258          * higher message throughput and the "simple publisher" is not a good
259          * choice.
260          * 
261          * @param hostlist
262          *            The host used in the URL to Cambria. Can be "host:port", can
263          *            be multiple comma-separated entries.
264          * @param topic
265          *            The topic on which to publish messages.
266          * @return a publisher
267          */
268         public static CambriaBatchingPublisher createSimplePublisher(
269                         String hostlist, String topic) {
270                 return createBatchingPublisher(hostlist, topic, 1, 1);
271         }
272
273         /**
274          * Create a publisher that batches messages. Be sure to close the publisher
275          * to send the last batch and ensure a clean shutdown. Message payloads are
276          * not compressed.
277          * 
278          * @param hostlist
279          *            The host used in the URL to Cambria. Can be "host:port", can
280          *            be multiple comma-separated entries.
281          * @param topic
282          *            The topic on which to publish messages.
283          * @param maxBatchSize
284          *            The largest set of messages to batch
285          * @param maxAgeMs
286          *            The maximum age of a message waiting in a batch
287          * 
288          * @return a publisher
289          */
290         public static CambriaBatchingPublisher createBatchingPublisher(
291                         String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
292                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
293                                 false);
294         }
295
296         /**
297          * Create a publisher that batches messages. Be sure to close the publisher
298          * to send the last batch and ensure a clean shutdown.
299          * 
300          * @param hostlist
301          *            The host used in the URL to Cambria. Can be "host:port", can
302          *            be multiple comma-separated entries.
303          * @param topic
304          *            The topic on which to publish messages.
305          * @param maxBatchSize
306          *            The largest set of messages to batch
307          * @param maxAgeMs
308          *            The maximum age of a message waiting in a batch
309          * @param compress
310          *            use gzip compression
311          * 
312          * @return a publisher
313          */
314         public static CambriaBatchingPublisher createBatchingPublisher(
315                         String hostlist, String topic, int maxBatchSize, long maxAgeMs,
316                         boolean compress) {
317                 return createBatchingPublisher(
318                                 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
319                                 maxBatchSize, maxAgeMs, compress);
320         }
321
322         /**
323          * Create a publisher that batches messages. Be sure to close the publisher
324          * to send the last batch and ensure a clean shutdown.
325          * 
326          * @param hostSet
327          *            A set of hosts to be used in the URL to Cambria. Can be
328          *            "host:port". Use multiple entries to enable failover.
329          * @param topic
330          *            The topic on which to publish messages.
331          * @param maxBatchSize
332          *            The largest set of messages to batch
333          * @param maxAgeMs
334          *            The maximum age of a message waiting in a batch
335          * @param compress
336          *            use gzip compression
337          * 
338          * @return a publisher
339          */
340         public static CambriaBatchingPublisher createBatchingPublisher(
341                         String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
342                         boolean compress) {
343                 final TreeSet<String> hosts = new TreeSet<String>();
344                 for (String hp : hostSet) {
345                         hosts.add(hp);
346                 }
347                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
348                                 compress);
349         }
350
351         /**
352          * Create a publisher that batches messages. Be sure to close the publisher
353          * to send the last batch and ensure a clean shutdown.
354          * 
355          * @param hostSet
356          *            A set of hosts to be used in the URL to Cambria. Can be
357          *            "host:port". Use multiple entries to enable failover.
358          * @param topic
359          *            The topic on which to publish messages.
360          * @param maxBatchSize
361          *            The largest set of messages to batch
362          * @param maxAgeMs
363          *            The maximum age of a message waiting in a batch
364          * @param compress
365          *            use gzip compression
366          * 
367          * @return a publisher
368          */
369         public static CambriaBatchingPublisher createBatchingPublisher(
370                         Collection<String> hostSet, String topic, int maxBatchSize,
371                         long maxAgeMs, boolean compress) {
372                 return new DMaaPCambriaSimplerBatchPublisher.Builder()
373                                 .againstUrls(hostSet).onTopic(topic)
374                                 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
375         }
376
377         /**
378          * Create an identity manager client to work with API keys.
379          * 
380          * @param hostSet
381          *            A set of hosts to be used in the URL to Cambria. Can be
382          *            "host:port". Use multiple entries to enable failover.
383          * @param apiKey
384          *            Your API key
385          * @param apiSecret
386          *            Your API secret
387          * @return an identity manager
388          */
389         
390
391         /**
392          * Create a topic manager for working with topics.
393          * 
394          * @param hostSet
395          *            A set of hosts to be used in the URL to Cambria. Can be
396          *            "host:port". Use multiple entries to enable failover.
397          * @param apiKey
398          *            Your API key
399          * @param apiSecret
400          *            Your API secret
401          * @return a topic manager
402          */
403         
404
405         /**
406          * Inject a consumer. Used to support unit tests.
407          * 
408          * @param cc
409          */
410         public static void $testInject(CambriaConsumer cc) {
411                 sfMock = cc;
412         }
413
414         private static CambriaConsumer sfMock = null;
415 }