1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher;
24 import java.net.MalformedURLException;
25 import java.util.Collection;
26 import java.util.TreeSet;
27 import java.util.UUID;
29 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
30 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
33 * A factory for Cambria clients.<br/>
35 * Use caution selecting a consumer creator factory. If the call doesn't accept
36 * a consumer group name, then it creates a consumer that is not restartable.
37 * That is, if you stop your process and start it again, your client will NOT
38 * receive any missed messages on the topic. If you need to ensure receipt of
39 * missed messages, then you must use a consumer that's created with a group
40 * name and ID. (If you create multiple consumer processes using the same group,
41 * load is split across them. Be sure to use a different ID for each instance.)<br/>
47 public class DMaaPCambriaClientFactory {
49 * Create a consumer instance with the default timeout and no limit on
50 * messages returned. This consumer operates as an independent consumer
51 * (i.e., not in a group) and is NOT re-startable across sessions.
54 * A comma separated list of hosts to use to connect to Cambria.
55 * You can include port numbers (3904 is the default). For
56 * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
59 * The topic to consume
63 public static CambriaConsumer createConsumer(String hostList, String topic) {
64 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
69 * Create a consumer instance with the default timeout and no limit on
70 * messages returned. This consumer operates as an independent consumer
71 * (i.e., not in a group) and is NOT re-startable across sessions.
74 * The host used in the URL to Cambria. Entries can be
77 * The topic to consume
81 public static CambriaConsumer createConsumer(Collection<String> hostSet,
83 return createConsumer(hostSet, topic, null);
87 * Create a consumer instance with server-side filtering, the default
88 * timeout, and no limit on messages returned. This consumer operates as an
89 * independent consumer (i.e., not in a group) and is NOT re-startable
93 * The host used in the URL to Cambria. Entries can be
96 * The topic to consume
98 * a filter to use on the server side
102 public static CambriaConsumer createConsumer(Collection<String> hostSet,
103 String topic, String filter) {
104 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
105 "0", -1, -1, filter, null, null);
109 * Create a consumer instance with the default timeout, and no limit on
110 * messages returned. This consumer can operate in a logical group and is
111 * re-startable across sessions when you use the same group and ID on
115 * The host used in the URL to Cambria. Entries can be
118 * The topic to consume
119 * @param consumerGroup
120 * The name of the consumer group this consumer is part of
122 * The unique id of this consume in its group
126 public static CambriaConsumer createConsumer(Collection<String> hostSet,
127 final String topic, final String consumerGroup,
128 final String consumerId) {
129 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
133 * Create a consumer instance with the default timeout, and no limit on
134 * messages returned. This consumer can operate in a logical group and is
135 * re-startable across sessions when you use the same group and ID on
139 * The host used in the URL to Cambria. Entries can be
142 * The topic to consume
143 * @param consumerGroup
144 * The name of the consumer group this consumer is part of
146 * The unique id of this consume in its group
148 * The amount of time in milliseconds that the server should keep
149 * the connection open while waiting for message traffic. Use -1
150 * for default timeout.
152 * A limit on the number of messages returned in a single call.
153 * Use -1 for no limit.
157 public static CambriaConsumer createConsumer(Collection<String> hostSet,
158 final String topic, final String consumerGroup,
159 final String consumerId, int timeoutMs, int limit) {
160 return createConsumer(hostSet, topic, consumerGroup, consumerId,
161 timeoutMs, limit, null, null, null);
165 * Create a consumer instance with the default timeout, and no limit on
166 * messages returned. This consumer can operate in a logical group and is
167 * re-startable across sessions when you use the same group and ID on
168 * restart. This consumer also uses server-side filtering.
171 * A comma separated list of hosts to use to connect to Cambria.
172 * You can include port numbers (3904 is the default). For
173 * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
175 * The topic to consume
176 * @param consumerGroup
177 * The name of the consumer group this consumer is part of
179 * The unique id of this consume in its group
181 * The amount of time in milliseconds that the server should keep
182 * the connection open while waiting for message traffic. Use -1
183 * for default timeout.
185 * A limit on the number of messages returned in a single call.
186 * Use -1 for no limit.
188 * A Highland Park filter expression using only built-in filter
189 * components. Use null for "no filter".
191 * key associated with a user
197 public static CambriaConsumer createConsumer(String hostList,
198 final String topic, final String consumerGroup,
199 final String consumerId, int timeoutMs, int limit, String filter,
200 String apiKey, String apiSecret) {
201 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
202 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
207 * Create a consumer instance with the default timeout, and no limit on
208 * messages returned. This consumer can operate in a logical group and is
209 * re-startable across sessions when you use the same group and ID on
210 * restart. This consumer also uses server-side filtering.
213 * The host used in the URL to Cambria. Entries can be
216 * The topic to consume
217 * @param consumerGroup
218 * The name of the consumer group this consumer is part of
220 * The unique id of this consume in its group
222 * The amount of time in milliseconds that the server should keep
223 * the connection open while waiting for message traffic. Use -1
224 * for default timeout.
226 * A limit on the number of messages returned in a single call.
227 * Use -1 for no limit.
229 * A Highland Park filter expression using only built-in filter
230 * components. Use null for "no filter".
232 * key associated with a user
237 public static CambriaConsumer createConsumer(Collection<String> hostSet,
238 final String topic, final String consumerGroup,
239 final String consumerId, int timeoutMs, int limit, String filter,
240 String apiKey, String apiSecret) {
244 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
245 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
246 } catch (MalformedURLException e) {
247 throw new RuntimeException(e);
251 /*************************************************************************/
252 /*************************************************************************/
253 /*************************************************************************/
256 * Create a publisher that sends each message (or group of messages)
257 * immediately. Most applications should favor higher latency for much
258 * higher message throughput and the "simple publisher" is not a good
262 * The host used in the URL to Cambria. Can be "host:port", can
263 * be multiple comma-separated entries.
265 * The topic on which to publish messages.
266 * @return a publisher
268 public static CambriaBatchingPublisher createSimplePublisher(
269 String hostlist, String topic) {
270 return createBatchingPublisher(hostlist, topic, 1, 1);
274 * Create a publisher that batches messages. Be sure to close the publisher
275 * to send the last batch and ensure a clean shutdown. Message payloads are
279 * The host used in the URL to Cambria. Can be "host:port", can
280 * be multiple comma-separated entries.
282 * The topic on which to publish messages.
283 * @param maxBatchSize
284 * The largest set of messages to batch
286 * The maximum age of a message waiting in a batch
288 * @return a publisher
290 public static CambriaBatchingPublisher createBatchingPublisher(
291 String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
292 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
297 * Create a publisher that batches messages. Be sure to close the publisher
298 * to send the last batch and ensure a clean shutdown.
301 * The host used in the URL to Cambria. Can be "host:port", can
302 * be multiple comma-separated entries.
304 * The topic on which to publish messages.
305 * @param maxBatchSize
306 * The largest set of messages to batch
308 * The maximum age of a message waiting in a batch
310 * use gzip compression
312 * @return a publisher
314 public static CambriaBatchingPublisher createBatchingPublisher(
315 String hostlist, String topic, int maxBatchSize, long maxAgeMs,
317 return createBatchingPublisher(
318 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
319 maxBatchSize, maxAgeMs, compress);
323 * Create a publisher that batches messages. Be sure to close the publisher
324 * to send the last batch and ensure a clean shutdown.
327 * A set of hosts to be used in the URL to Cambria. Can be
328 * "host:port". Use multiple entries to enable failover.
330 * The topic on which to publish messages.
331 * @param maxBatchSize
332 * The largest set of messages to batch
334 * The maximum age of a message waiting in a batch
336 * use gzip compression
338 * @return a publisher
340 public static CambriaBatchingPublisher createBatchingPublisher(
341 String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
343 final TreeSet<String> hosts = new TreeSet<String>();
344 for (String hp : hostSet) {
347 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
352 * Create a publisher that batches messages. Be sure to close the publisher
353 * to send the last batch and ensure a clean shutdown.
356 * A set of hosts to be used in the URL to Cambria. Can be
357 * "host:port". Use multiple entries to enable failover.
359 * The topic on which to publish messages.
360 * @param maxBatchSize
361 * The largest set of messages to batch
363 * The maximum age of a message waiting in a batch
365 * use gzip compression
367 * @return a publisher
369 public static CambriaBatchingPublisher createBatchingPublisher(
370 Collection<String> hostSet, String topic, int maxBatchSize,
371 long maxAgeMs, boolean compress) {
372 return new DMaaPCambriaSimplerBatchPublisher.Builder()
373 .againstUrls(hostSet).onTopic(topic)
374 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
378 * Create an identity manager client to work with API keys.
381 * A set of hosts to be used in the URL to Cambria. Can be
382 * "host:port". Use multiple entries to enable failover.
387 * @return an identity manager
392 * Create a topic manager for working with topics.
395 * A set of hosts to be used in the URL to Cambria. Can be
396 * "host:port". Use multiple entries to enable failover.
401 * @return a topic manager
406 * Inject a consumer. Used to support unit tests.
410 public static void $testInject(CambriaConsumer cc) {
414 private static CambriaConsumer sfMock = null;