1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.metrics.publisher;
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
29 import com.att.nsa.cambria.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
30 import com.att.nsa.cambria.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
33 * A factory for Cambria clients.<br/>
35 * Use caution selecting a consumer creator factory. If the call doesn't accept
36 * a consumer group name, then it creates a consumer that is not restartable.
37 * That is, if you stop your process and start it again, your client will NOT
38 * receive any missed messages on the topic. If you need to ensure receipt of
39 * missed messages, then you must use a consumer that's created with a group
40 * name and ID. (If you create multiple consumer processes using the same group,
41 * load is split across them. Be sure to use a different ID for each instance.)<br/>
47 public class DMaaPCambriaClientFactory {
49 * Create a consumer instance with the default timeout and no limit on
50 * messages returned. This consumer operates as an independent consumer
51 * (i.e., not in a group) and is NOT re-startable across sessions.
54 * A comma separated list of hosts to use to connect to Cambria.
55 * You can include port numbers (3904 is the default).
58 * The topic to consume
62 public static CambriaConsumer createConsumer(String hostList, String topic) {
63 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
68 * Create a consumer instance with the default timeout and no limit on
69 * messages returned. This consumer operates as an independent consumer
70 * (i.e., not in a group) and is NOT re-startable across sessions.
73 * The host used in the URL to Cambria. Entries can be
76 * The topic to consume
80 public static CambriaConsumer createConsumer(Collection<String> hostSet,
82 return createConsumer(hostSet, topic, null);
86 * Create a consumer instance with server-side filtering, the default
87 * timeout, and no limit on messages returned. This consumer operates as an
88 * independent consumer (i.e., not in a group) and is NOT re-startable
92 * The host used in the URL to Cambria. Entries can be
95 * The topic to consume
97 * a filter to use on the server side
101 public static CambriaConsumer createConsumer(Collection<String> hostSet,
102 String topic, String filter) {
103 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
104 "0", -1, -1, filter, null, null);
108 * Create a consumer instance with the default timeout, and no limit on
109 * messages returned. This consumer can operate in a logical group and is
110 * re-startable across sessions when you use the same group and ID on
114 * The host used in the URL to Cambria. Entries can be
117 * The topic to consume
118 * @param consumerGroup
119 * The name of the consumer group this consumer is part of
121 * The unique id of this consume in its group
125 public static CambriaConsumer createConsumer(Collection<String> hostSet,
126 final String topic, final String consumerGroup,
127 final String consumerId) {
128 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
132 * Create a consumer instance with the default timeout, and no limit on
133 * messages returned. This consumer can operate in a logical group and is
134 * re-startable across sessions when you use the same group and ID on
138 * The host used in the URL to Cambria. Entries can be
141 * The topic to consume
142 * @param consumerGroup
143 * The name of the consumer group this consumer is part of
145 * The unique id of this consume in its group
147 * The amount of time in milliseconds that the server should keep
148 * the connection open while waiting for message traffic. Use -1
149 * for default timeout.
151 * A limit on the number of messages returned in a single call.
152 * Use -1 for no limit.
156 public static CambriaConsumer createConsumer(Collection<String> hostSet,
157 final String topic, final String consumerGroup,
158 final String consumerId, int timeoutMs, int limit) {
159 return createConsumer(hostSet, topic, consumerGroup, consumerId,
160 timeoutMs, limit, null, null, null);
164 * Create a consumer instance with the default timeout, and no limit on
165 * messages returned. This consumer can operate in a logical group and is
166 * re-startable across sessions when you use the same group and ID on
167 * restart. This consumer also uses server-side filtering.
170 * A comma separated list of hosts to use to connect to Cambria.
171 * You can include port numbers (3904 is the default).
173 * The topic to consume
174 * @param consumerGroup
175 * The name of the consumer group this consumer is part of
177 * The unique id of this consume in its group
179 * The amount of time in milliseconds that the server should keep
180 * the connection open while waiting for message traffic. Use -1
181 * for default timeout.
183 * A limit on the number of messages returned in a single call.
184 * Use -1 for no limit.
186 * A Highland Park filter expression using only built-in filter
187 * components. Use null for "no filter".
189 * key associated with a user
195 public static CambriaConsumer createConsumer(String hostList,
196 final String topic, final String consumerGroup,
197 final String consumerId, int timeoutMs, int limit, String filter,
198 String apiKey, String apiSecret) {
199 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
200 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
205 * Create a consumer instance with the default timeout, and no limit on
206 * messages returned. This consumer can operate in a logical group and is
207 * re-startable across sessions when you use the same group and ID on
208 * restart. This consumer also uses server-side filtering.
211 * The host used in the URL to Cambria. Entries can be
214 * The topic to consume
215 * @param consumerGroup
216 * The name of the consumer group this consumer is part of
218 * The unique id of this consume in its group
220 * The amount of time in milliseconds that the server should keep
221 * the connection open while waiting for message traffic. Use -1
222 * for default timeout.
224 * A limit on the number of messages returned in a single call.
225 * Use -1 for no limit.
227 * A Highland Park filter expression using only built-in filter
228 * components. Use null for "no filter".
230 * key associated with a user
235 public static CambriaConsumer createConsumer(Collection<String> hostSet,
236 final String topic, final String consumerGroup,
237 final String consumerId, int timeoutMs, int limit, String filter,
238 String apiKey, String apiSecret) {
242 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
243 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
244 } catch (MalformedURLException e) {
245 throw new RuntimeException(e);
249 /*************************************************************************/
250 /*************************************************************************/
251 /*************************************************************************/
254 * Create a publisher that sends each message (or group of messages)
255 * immediately. Most applications should favor higher latency for much
256 * higher message throughput and the "simple publisher" is not a good
260 * The host used in the URL to Cambria. Can be "host:port", can
261 * be multiple comma-separated entries.
263 * The topic on which to publish messages.
264 * @return a publisher
266 public static CambriaBatchingPublisher createSimplePublisher(
267 String hostlist, String topic) {
268 return createBatchingPublisher(hostlist, topic, 1, 1);
272 * Create a publisher that batches messages. Be sure to close the publisher
273 * to send the last batch and ensure a clean shutdown. Message payloads are
277 * The host used in the URL to Cambria. Can be "host:port", can
278 * be multiple comma-separated entries.
280 * The topic on which to publish messages.
281 * @param maxBatchSize
282 * The largest set of messages to batch
284 * The maximum age of a message waiting in a batch
286 * @return a publisher
288 public static CambriaBatchingPublisher createBatchingPublisher(
289 String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
290 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
295 * Create a publisher that batches messages. Be sure to close the publisher
296 * to send the last batch and ensure a clean shutdown.
299 * The host used in the URL to Cambria. Can be "host:port", can
300 * be multiple comma-separated entries.
302 * The topic on which to publish messages.
303 * @param maxBatchSize
304 * The largest set of messages to batch
306 * The maximum age of a message waiting in a batch
308 * use gzip compression
310 * @return a publisher
312 public static CambriaBatchingPublisher createBatchingPublisher(
313 String hostlist, String topic, int maxBatchSize, long maxAgeMs,
315 return createBatchingPublisher(
316 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
317 maxBatchSize, maxAgeMs, compress);
321 * Create a publisher that batches messages. Be sure to close the publisher
322 * to send the last batch and ensure a clean shutdown.
325 * A set of hosts to be used in the URL to Cambria. Can be
326 * "host:port". Use multiple entries to enable failover.
328 * The topic on which to publish messages.
329 * @param maxBatchSize
330 * The largest set of messages to batch
332 * The maximum age of a message waiting in a batch
334 * use gzip compression
336 * @return a publisher
338 public static CambriaBatchingPublisher createBatchingPublisher(
339 String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
341 final TreeSet<String> hosts = new TreeSet<String>();
342 for (String hp : hostSet) {
345 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
350 * Create a publisher that batches messages. Be sure to close the publisher
351 * to send the last batch and ensure a clean shutdown.
354 * A set of hosts to be used in the URL to Cambria. Can be
355 * "host:port". Use multiple entries to enable failover.
357 * The topic on which to publish messages.
358 * @param maxBatchSize
359 * The largest set of messages to batch
361 * The maximum age of a message waiting in a batch
363 * use gzip compression
365 * @return a publisher
367 public static CambriaBatchingPublisher createBatchingPublisher(
368 Collection<String> hostSet, String topic, int maxBatchSize,
369 long maxAgeMs, boolean compress) {
370 return new DMaaPCambriaSimplerBatchPublisher.Builder()
371 .againstUrls(hostSet).onTopic(topic)
372 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
376 * Create an identity manager client to work with API keys.
379 * A set of hosts to be used in the URL to Cambria. Can be
380 * "host:port". Use multiple entries to enable failover.
385 * @return an identity manager
388 * public static CambriaIdentityManager createIdentityManager (
389 * Collection<String> hostSet, String apiKey, String apiSecret ) { final
390 * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet );
391 * cim.setApiCredentials ( apiKey, apiSecret ); return cim; }
395 * Create a topic manager for working with topics.
398 * A set of hosts to be used in the URL to Cambria. Can be
399 * "host:port". Use multiple entries to enable failover.
404 * @return a topic manager
407 * public static CambriaTopicManager createTopicManager ( Collection<String>
408 * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi
409 * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey,
410 * apiSecret ); return tmi; }
414 * Inject a consumer. Used to support unit tests.
418 public static void $testInject(CambriaConsumer cc) {
422 private static CambriaConsumer sfMock = null;