clean MR codebase
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / metrics / publisher / DMaaPCambriaClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher;
23
24 import java.net.MalformedURLException;
25 import java.nio.channels.NotYetConnectedException;
26 import java.util.Collection;
27 import java.util.TreeSet;
28 import java.util.UUID;
29
30 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
31 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
32
33 /**
34  * A factory for Cambria clients.<br/>
35  * <br/>
36  * Use caution selecting a consumer creator factory. If the call doesn't accept
37  * a consumer group name, then it creates a consumer that is not restartable.
38  * That is, if you stop your process and start it again, your client will NOT
39  * receive any missed messages on the topic. If you need to ensure receipt of
40  * missed messages, then you must use a consumer that's created with a group
41  * name and ID. (If you create multiple consumer processes using the same group,
42  * load is split across them. Be sure to use a different ID for each instance.)<br/>
43  * <br/>
44  * Publishers
45  * 
46  * @author peter
47  */
48 public class DMaaPCambriaClientFactory {
49         /**
50          * Create a consumer instance with the default timeout and no limit on
51          * messages returned. This consumer operates as an independent consumer
52          * (i.e., not in a group) and is NOT re-startable across sessions.
53          * 
54          * @param hostList
55          *            A comma separated list of hosts to use to connect to Cambria.
56          *            You can include port numbers (3904 is the default). For
57          * 
58          * @param topic
59          *            The topic to consume
60          * 
61          * @return a consumer
62          */
63         public static CambriaConsumer createConsumer(String hostList, String topic) {
64                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
65                                 topic);
66         }
67
68         /**
69          * Create a consumer instance with the default timeout and no limit on
70          * messages returned. This consumer operates as an independent consumer
71          * (i.e., not in a group) and is NOT re-startable across sessions.
72          * 
73          * @param hostSet
74          *            The host used in the URL to Cambria. Entries can be
75          *            "host:port".
76          * @param topic
77          *            The topic to consume
78          * 
79          * @return a consumer
80          */
81         public static CambriaConsumer createConsumer(Collection<String> hostSet,
82                         String topic) {
83                 return createConsumer(hostSet, topic, null);
84         }
85
86         /**
87          * Create a consumer instance with server-side filtering, the default
88          * timeout, and no limit on messages returned. This consumer operates as an
89          * independent consumer (i.e., not in a group) and is NOT re-startable
90          * across sessions.
91          * 
92          * @param hostSet
93          *            The host used in the URL to Cambria. Entries can be
94          *            "host:port".
95          * @param topic
96          *            The topic to consume
97          * @param filter
98          *            a filter to use on the server side
99          * 
100          * @return a consumer
101          */
102         public static CambriaConsumer createConsumer(Collection<String> hostSet,
103                         String topic, String filter) {
104                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
105                                 "0", -1, -1, filter, null, null);
106         }
107
108         /**
109          * Create a consumer instance with the default timeout, and no limit on
110          * messages returned. This consumer can operate in a logical group and is
111          * re-startable across sessions when you use the same group and ID on
112          * restart.
113          * 
114          * @param hostSet
115          *            The host used in the URL to Cambria. Entries can be
116          *            "host:port".
117          * @param topic
118          *            The topic to consume
119          * @param consumerGroup
120          *            The name of the consumer group this consumer is part of
121          * @param consumerId
122          *            The unique id of this consume in its group
123          * 
124          * @return a consumer
125          */
126         public static CambriaConsumer createConsumer(Collection<String> hostSet,
127                         final String topic, final String consumerGroup,
128                         final String consumerId) {
129                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
130         }
131
132         /**
133          * Create a consumer instance with the default timeout, and no limit on
134          * messages returned. This consumer can operate in a logical group and is
135          * re-startable across sessions when you use the same group and ID on
136          * restart.
137          * 
138          * @param hostSet
139          *            The host used in the URL to Cambria. Entries can be
140          *            "host:port".
141          * @param topic
142          *            The topic to consume
143          * @param consumerGroup
144          *            The name of the consumer group this consumer is part of
145          * @param consumerId
146          *            The unique id of this consume in its group
147          * @param timeoutMs
148          *            The amount of time in milliseconds that the server should keep
149          *            the connection open while waiting for message traffic. Use -1
150          *            for default timeout.
151          * @param limit
152          *            A limit on the number of messages returned in a single call.
153          *            Use -1 for no limit.
154          * 
155          * @return a consumer
156          */
157         public static CambriaConsumer createConsumer(Collection<String> hostSet,
158                         final String topic, final String consumerGroup,
159                         final String consumerId, int timeoutMs, int limit) {
160                 return createConsumer(hostSet, topic, consumerGroup, consumerId,
161                                 timeoutMs, limit, null, null, null);
162         }
163
164         /**
165          * Create a consumer instance with the default timeout, and no limit on
166          * messages returned. This consumer can operate in a logical group and is
167          * re-startable across sessions when you use the same group and ID on
168          * restart. This consumer also uses server-side filtering.
169          * 
170          * @param hostList
171          *            A comma separated list of hosts to use to connect to Cambria.
172          *            You can include port numbers (3904 is the default). For
173          * @param topic
174          *            The topic to consume
175          * @param consumerGroup
176          *            The name of the consumer group this consumer is part of
177          * @param consumerId
178          *            The unique id of this consume in its group
179          * @param timeoutMs
180          *            The amount of time in milliseconds that the server should keep
181          *            the connection open while waiting for message traffic. Use -1
182          *            for default timeout.
183          * @param limit
184          *            A limit on the number of messages returned in a single call.
185          *            Use -1 for no limit.
186          * @param filter
187          *            A Highland Park filter expression using only built-in filter
188          *            components. Use null for "no filter".
189          * @param apiKey
190          *            key associated with a user
191          * @param apiSecret
192          *            of a user
193          * 
194          * @return a consumer
195          */
196         public static CambriaConsumer createConsumer(String hostList,
197                         final String topic, final String consumerGroup,
198                         final String consumerId, int timeoutMs, int limit, String filter,
199                         String apiKey, String apiSecret) {
200                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
201                                 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
202                                 apiKey, apiSecret);
203         }
204
205         /**
206          * Create a consumer instance with the default timeout, and no limit on
207          * messages returned. This consumer can operate in a logical group and is
208          * re-startable across sessions when you use the same group and ID on
209          * restart. This consumer also uses server-side filtering.
210          * 
211          * @param hostSet
212          *            The host used in the URL to Cambria. Entries can be
213          *            "host:port".
214          * @param topic
215          *            The topic to consume
216          * @param consumerGroup
217          *            The name of the consumer group this consumer is part of
218          * @param consumerId
219          *            The unique id of this consume in its group
220          * @param timeoutMs
221          *            The amount of time in milliseconds that the server should keep
222          *            the connection open while waiting for message traffic. Use -1
223          *            for default timeout.
224          * @param limit
225          *            A limit on the number of messages returned in a single call.
226          *            Use -1 for no limit.
227          * @param filter
228          *            A Highland Park filter expression using only built-in filter
229          *            components. Use null for "no filter".
230          * @param apiKey
231          *            key associated with a user
232          * @param apiSecret
233          *            of a user
234          * @return a consumer
235          */
236         public static CambriaConsumer createConsumer(Collection<String> hostSet,
237                         final String topic, final String consumerGroup,
238                         final String consumerId, int timeoutMs, int limit, String filter,
239                         String apiKey, String apiSecret) {
240                 if (sfMock != null)
241                         return sfMock;
242                 try {
243                 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
244                                 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
245         } catch (MalformedURLException e) {
246                 
247                 NotYetConnectedException exception=new NotYetConnectedException();
248                 exception.setStackTrace(e.getStackTrace());
249                 
250                 throw exception ;
251         }
252         }
253
254         /*************************************************************************/
255         /*************************************************************************/
256         /*************************************************************************/
257
258         /**
259          * Create a publisher that sends each message (or group of messages)
260          * immediately. Most applications should favor higher latency for much
261          * higher message throughput and the "simple publisher" is not a good
262          * choice.
263          * 
264          * @param hostlist
265          *            The host used in the URL to Cambria. Can be "host:port", can
266          *            be multiple comma-separated entries.
267          * @param topic
268          *            The topic on which to publish messages.
269          * @return a publisher
270          */
271         public static CambriaBatchingPublisher createSimplePublisher(
272                         String hostlist, String topic) {
273                 return createBatchingPublisher(hostlist, topic, 1, 1);
274         }
275
276         /**
277          * Create a publisher that batches messages. Be sure to close the publisher
278          * to send the last batch and ensure a clean shutdown. Message payloads are
279          * not compressed.
280          * 
281          * @param hostlist
282          *            The host used in the URL to Cambria. Can be "host:port", can
283          *            be multiple comma-separated entries.
284          * @param topic
285          *            The topic on which to publish messages.
286          * @param maxBatchSize
287          *            The largest set of messages to batch
288          * @param maxAgeMs
289          *            The maximum age of a message waiting in a batch
290          * 
291          * @return a publisher
292          */
293         public static CambriaBatchingPublisher createBatchingPublisher(
294                         String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
295                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
296                                 false);
297         }
298
299         /**
300          * Create a publisher that batches messages. Be sure to close the publisher
301          * to send the last batch and ensure a clean shutdown.
302          * 
303          * @param hostlist
304          *            The host used in the URL to Cambria. Can be "host:port", can
305          *            be multiple comma-separated entries.
306          * @param topic
307          *            The topic on which to publish messages.
308          * @param maxBatchSize
309          *            The largest set of messages to batch
310          * @param maxAgeMs
311          *            The maximum age of a message waiting in a batch
312          * @param compress
313          *            use gzip compression
314          * 
315          * @return a publisher
316          */
317         public static CambriaBatchingPublisher createBatchingPublisher(
318                         String hostlist, String topic, int maxBatchSize, long maxAgeMs,
319                         boolean compress) {
320                 return createBatchingPublisher(
321                                 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
322                                 maxBatchSize, maxAgeMs, compress);
323         }
324
325         /**
326          * Create a publisher that batches messages. Be sure to close the publisher
327          * to send the last batch and ensure a clean shutdown.
328          * 
329          * @param hostSet
330          *            A set of hosts to be used in the URL to Cambria. Can be
331          *            "host:port". Use multiple entries to enable failover.
332          * @param topic
333          *            The topic on which to publish messages.
334          * @param maxBatchSize
335          *            The largest set of messages to batch
336          * @param maxAgeMs
337          *            The maximum age of a message waiting in a batch
338          * @param compress
339          *            use gzip compression
340          * 
341          * @return a publisher
342          */
343         public static CambriaBatchingPublisher createBatchingPublisher(
344                         String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
345                         boolean compress) {
346                 final TreeSet<String> hosts = new TreeSet<String>();
347                 for (String hp : hostSet) {
348                         hosts.add(hp);
349                 }
350                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
351                                 compress);
352         }
353
354         /**
355          * Create a publisher that batches messages. Be sure to close the publisher
356          * to send the last batch and ensure a clean shutdown.
357          * 
358          * @param hostSet
359          *            A set of hosts to be used in the URL to Cambria. Can be
360          *            "host:port". Use multiple entries to enable failover.
361          * @param topic
362          *            The topic on which to publish messages.
363          * @param maxBatchSize
364          *            The largest set of messages to batch
365          * @param maxAgeMs
366          *            The maximum age of a message waiting in a batch
367          * @param compress
368          *            use gzip compression
369          * 
370          * @return a publisher
371          */
372         public static CambriaBatchingPublisher createBatchingPublisher(
373                         Collection<String> hostSet, String topic, int maxBatchSize,
374                         long maxAgeMs, boolean compress) {
375                 return new DMaaPCambriaSimplerBatchPublisher.Builder()
376                                 .againstUrls(hostSet).onTopic(topic)
377                                 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
378         }
379
380         /**
381          * Create an identity manager client to work with API keys.
382          * 
383          * @param hostSet
384          *            A set of hosts to be used in the URL to Cambria. Can be
385          *            "host:port". Use multiple entries to enable failover.
386          * @param apiKey
387          *            Your API key
388          * @param apiSecret
389          *            Your API secret
390          * @return an identity manager
391          */
392         
393
394         /**
395          * Create a topic manager for working with topics.
396          * 
397          * @param hostSet
398          *            A set of hosts to be used in the URL to Cambria. Can be
399          *            "host:port". Use multiple entries to enable failover.
400          * @param apiKey
401          *            Your API key
402          * @param apiSecret
403          *            Your API secret
404          * @return a topic manager
405          */
406         
407
408         /**
409          * Inject a consumer. Used to support unit tests.
410          * 
411          * @param cc
412          */
413         public static void $testInject(CambriaConsumer cc) {
414                 sfMock = cc;
415         }
416
417         private static CambriaConsumer sfMock = null;
418 }