ff46ce3d1130373d6367fe566842176ed9333b30
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / metrics / publisher / DMaaPCambriaClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher;
23
24 import java.net.MalformedURLException;
25 import java.nio.channels.NotYetConnectedException;
26 import java.util.Collection;
27 import java.util.TreeSet;
28 import java.util.UUID;
29
30 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
31 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
32
33 /**
34  * A factory for Cambria clients.<br/>
35  * <br/>
36  * Use caution selecting a consumer creator factory. If the call doesn't accept
37  * a consumer group name, then it creates a consumer that is not restartable.
38  * That is, if you stop your process and start it again, your client will NOT
39  * receive any missed messages on the topic. If you need to ensure receipt of
40  * missed messages, then you must use a consumer that's created with a group
41  * name and ID. (If you create multiple consumer processes using the same group,
42  * load is split across them. Be sure to use a different ID for each instance.)<br/>
43  * <br/>
44  * Publishers
45  * 
46  * @author peter
47  */
48 public class DMaaPCambriaClientFactory {
49         /**
50          * Create a consumer instance with the default timeout and no limit on
51          * messages returned. This consumer operates as an independent consumer
52          * (i.e., not in a group) and is NOT re-startable across sessions.
53          * 
54          * @param hostList
55          *            A comma separated list of hosts to use to connect to Cambria.
56          *            You can include port numbers (3904 is the default). For
57          *            example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
58          * 
59          * @param topic
60          *            The topic to consume
61          * 
62          * @return a consumer
63          */
64         public static CambriaConsumer createConsumer(String hostList, String topic) {
65                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
66                                 topic);
67         }
68
69         /**
70          * Create a consumer instance with the default timeout and no limit on
71          * messages returned. This consumer operates as an independent consumer
72          * (i.e., not in a group) and is NOT re-startable across sessions.
73          * 
74          * @param hostSet
75          *            The host used in the URL to Cambria. Entries can be
76          *            "host:port".
77          * @param topic
78          *            The topic to consume
79          * 
80          * @return a consumer
81          */
82         public static CambriaConsumer createConsumer(Collection<String> hostSet,
83                         String topic) {
84                 return createConsumer(hostSet, topic, null);
85         }
86
87         /**
88          * Create a consumer instance with server-side filtering, the default
89          * timeout, and no limit on messages returned. This consumer operates as an
90          * independent consumer (i.e., not in a group) and is NOT re-startable
91          * across sessions.
92          * 
93          * @param hostSet
94          *            The host used in the URL to Cambria. Entries can be
95          *            "host:port".
96          * @param topic
97          *            The topic to consume
98          * @param filter
99          *            a filter to use on the server side
100          * 
101          * @return a consumer
102          */
103         public static CambriaConsumer createConsumer(Collection<String> hostSet,
104                         String topic, String filter) {
105                 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
106                                 "0", -1, -1, filter, null, null);
107         }
108
109         /**
110          * Create a consumer instance with the default timeout, and no limit on
111          * messages returned. This consumer can operate in a logical group and is
112          * re-startable across sessions when you use the same group and ID on
113          * restart.
114          * 
115          * @param hostSet
116          *            The host used in the URL to Cambria. Entries can be
117          *            "host:port".
118          * @param topic
119          *            The topic to consume
120          * @param consumerGroup
121          *            The name of the consumer group this consumer is part of
122          * @param consumerId
123          *            The unique id of this consume in its group
124          * 
125          * @return a consumer
126          */
127         public static CambriaConsumer createConsumer(Collection<String> hostSet,
128                         final String topic, final String consumerGroup,
129                         final String consumerId) {
130                 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
131         }
132
133         /**
134          * Create a consumer instance with the default timeout, and no limit on
135          * messages returned. This consumer can operate in a logical group and is
136          * re-startable across sessions when you use the same group and ID on
137          * restart.
138          * 
139          * @param hostSet
140          *            The host used in the URL to Cambria. Entries can be
141          *            "host:port".
142          * @param topic
143          *            The topic to consume
144          * @param consumerGroup
145          *            The name of the consumer group this consumer is part of
146          * @param consumerId
147          *            The unique id of this consume in its group
148          * @param timeoutMs
149          *            The amount of time in milliseconds that the server should keep
150          *            the connection open while waiting for message traffic. Use -1
151          *            for default timeout.
152          * @param limit
153          *            A limit on the number of messages returned in a single call.
154          *            Use -1 for no limit.
155          * 
156          * @return a consumer
157          */
158         public static CambriaConsumer createConsumer(Collection<String> hostSet,
159                         final String topic, final String consumerGroup,
160                         final String consumerId, int timeoutMs, int limit) {
161                 return createConsumer(hostSet, topic, consumerGroup, consumerId,
162                                 timeoutMs, limit, null, null, null);
163         }
164
165         /**
166          * Create a consumer instance with the default timeout, and no limit on
167          * messages returned. This consumer can operate in a logical group and is
168          * re-startable across sessions when you use the same group and ID on
169          * restart. This consumer also uses server-side filtering.
170          * 
171          * @param hostList
172          *            A comma separated list of hosts to use to connect to Cambria.
173          *            You can include port numbers (3904 is the default). For
174          *            example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
175          * @param topic
176          *            The topic to consume
177          * @param consumerGroup
178          *            The name of the consumer group this consumer is part of
179          * @param consumerId
180          *            The unique id of this consume in its group
181          * @param timeoutMs
182          *            The amount of time in milliseconds that the server should keep
183          *            the connection open while waiting for message traffic. Use -1
184          *            for default timeout.
185          * @param limit
186          *            A limit on the number of messages returned in a single call.
187          *            Use -1 for no limit.
188          * @param filter
189          *            A Highland Park filter expression using only built-in filter
190          *            components. Use null for "no filter".
191          * @param apiKey
192          *            key associated with a user
193          * @param apiSecret
194          *            of a user
195          * 
196          * @return a consumer
197          */
198         public static CambriaConsumer createConsumer(String hostList,
199                         final String topic, final String consumerGroup,
200                         final String consumerId, int timeoutMs, int limit, String filter,
201                         String apiKey, String apiSecret) {
202                 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
203                                 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
204                                 apiKey, apiSecret);
205         }
206
207         /**
208          * Create a consumer instance with the default timeout, and no limit on
209          * messages returned. This consumer can operate in a logical group and is
210          * re-startable across sessions when you use the same group and ID on
211          * restart. This consumer also uses server-side filtering.
212          * 
213          * @param hostSet
214          *            The host used in the URL to Cambria. Entries can be
215          *            "host:port".
216          * @param topic
217          *            The topic to consume
218          * @param consumerGroup
219          *            The name of the consumer group this consumer is part of
220          * @param consumerId
221          *            The unique id of this consume in its group
222          * @param timeoutMs
223          *            The amount of time in milliseconds that the server should keep
224          *            the connection open while waiting for message traffic. Use -1
225          *            for default timeout.
226          * @param limit
227          *            A limit on the number of messages returned in a single call.
228          *            Use -1 for no limit.
229          * @param filter
230          *            A Highland Park filter expression using only built-in filter
231          *            components. Use null for "no filter".
232          * @param apiKey
233          *            key associated with a user
234          * @param apiSecret
235          *            of a user
236          * @return a consumer
237          */
238         public static CambriaConsumer createConsumer(Collection<String> hostSet,
239                         final String topic, final String consumerGroup,
240                         final String consumerId, int timeoutMs, int limit, String filter,
241                         String apiKey, String apiSecret) {
242                 if (sfMock != null)
243                         return sfMock;
244                 try {
245                 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
246                                 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
247         } catch (MalformedURLException e) {
248                 
249                 NotYetConnectedException exception=new NotYetConnectedException();
250                 exception.setStackTrace(e.getStackTrace());
251                 
252                 throw exception ;
253         }
254         }
255
256         /*************************************************************************/
257         /*************************************************************************/
258         /*************************************************************************/
259
260         /**
261          * Create a publisher that sends each message (or group of messages)
262          * immediately. Most applications should favor higher latency for much
263          * higher message throughput and the "simple publisher" is not a good
264          * choice.
265          * 
266          * @param hostlist
267          *            The host used in the URL to Cambria. Can be "host:port", can
268          *            be multiple comma-separated entries.
269          * @param topic
270          *            The topic on which to publish messages.
271          * @return a publisher
272          */
273         public static CambriaBatchingPublisher createSimplePublisher(
274                         String hostlist, String topic) {
275                 return createBatchingPublisher(hostlist, topic, 1, 1);
276         }
277
278         /**
279          * Create a publisher that batches messages. Be sure to close the publisher
280          * to send the last batch and ensure a clean shutdown. Message payloads are
281          * not compressed.
282          * 
283          * @param hostlist
284          *            The host used in the URL to Cambria. Can be "host:port", can
285          *            be multiple comma-separated entries.
286          * @param topic
287          *            The topic on which to publish messages.
288          * @param maxBatchSize
289          *            The largest set of messages to batch
290          * @param maxAgeMs
291          *            The maximum age of a message waiting in a batch
292          * 
293          * @return a publisher
294          */
295         public static CambriaBatchingPublisher createBatchingPublisher(
296                         String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
297                 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
298                                 false);
299         }
300
301         /**
302          * Create a publisher that batches messages. Be sure to close the publisher
303          * to send the last batch and ensure a clean shutdown.
304          * 
305          * @param hostlist
306          *            The host used in the URL to Cambria. Can be "host:port", can
307          *            be multiple comma-separated entries.
308          * @param topic
309          *            The topic on which to publish messages.
310          * @param maxBatchSize
311          *            The largest set of messages to batch
312          * @param maxAgeMs
313          *            The maximum age of a message waiting in a batch
314          * @param compress
315          *            use gzip compression
316          * 
317          * @return a publisher
318          */
319         public static CambriaBatchingPublisher createBatchingPublisher(
320                         String hostlist, String topic, int maxBatchSize, long maxAgeMs,
321                         boolean compress) {
322                 return createBatchingPublisher(
323                                 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
324                                 maxBatchSize, maxAgeMs, compress);
325         }
326
327         /**
328          * Create a publisher that batches messages. Be sure to close the publisher
329          * to send the last batch and ensure a clean shutdown.
330          * 
331          * @param hostSet
332          *            A set of hosts to be used in the URL to Cambria. Can be
333          *            "host:port". Use multiple entries to enable failover.
334          * @param topic
335          *            The topic on which to publish messages.
336          * @param maxBatchSize
337          *            The largest set of messages to batch
338          * @param maxAgeMs
339          *            The maximum age of a message waiting in a batch
340          * @param compress
341          *            use gzip compression
342          * 
343          * @return a publisher
344          */
345         public static CambriaBatchingPublisher createBatchingPublisher(
346                         String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
347                         boolean compress) {
348                 final TreeSet<String> hosts = new TreeSet<String>();
349                 for (String hp : hostSet) {
350                         hosts.add(hp);
351                 }
352                 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
353                                 compress);
354         }
355
356         /**
357          * Create a publisher that batches messages. Be sure to close the publisher
358          * to send the last batch and ensure a clean shutdown.
359          * 
360          * @param hostSet
361          *            A set of hosts to be used in the URL to Cambria. Can be
362          *            "host:port". Use multiple entries to enable failover.
363          * @param topic
364          *            The topic on which to publish messages.
365          * @param maxBatchSize
366          *            The largest set of messages to batch
367          * @param maxAgeMs
368          *            The maximum age of a message waiting in a batch
369          * @param compress
370          *            use gzip compression
371          * 
372          * @return a publisher
373          */
374         public static CambriaBatchingPublisher createBatchingPublisher(
375                         Collection<String> hostSet, String topic, int maxBatchSize,
376                         long maxAgeMs, boolean compress) {
377                 return new DMaaPCambriaSimplerBatchPublisher.Builder()
378                                 .againstUrls(hostSet).onTopic(topic)
379                                 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
380         }
381
382         /**
383          * Create an identity manager client to work with API keys.
384          * 
385          * @param hostSet
386          *            A set of hosts to be used in the URL to Cambria. Can be
387          *            "host:port". Use multiple entries to enable failover.
388          * @param apiKey
389          *            Your API key
390          * @param apiSecret
391          *            Your API secret
392          * @return an identity manager
393          */
394         
395
396         /**
397          * Create a topic manager for working with topics.
398          * 
399          * @param hostSet
400          *            A set of hosts to be used in the URL to Cambria. Can be
401          *            "host:port". Use multiple entries to enable failover.
402          * @param apiKey
403          *            Your API key
404          * @param apiSecret
405          *            Your API secret
406          * @return a topic manager
407          */
408         
409
410         /**
411          * Inject a consumer. Used to support unit tests.
412          * 
413          * @param cc
414          */
415         public static void $testInject(CambriaConsumer cc) {
416                 sfMock = cc;
417         }
418
419         private static CambriaConsumer sfMock = null;
420 }