1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher;
24 import java.net.MalformedURLException;
25 import java.nio.channels.NotYetConnectedException;
26 import java.util.Collection;
27 import java.util.TreeSet;
28 import java.util.UUID;
30 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
31 import com.att.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
34 * A factory for Cambria clients.<br/>
36 * Use caution selecting a consumer creator factory. If the call doesn't accept
37 * a consumer group name, then it creates a consumer that is not restartable.
38 * That is, if you stop your process and start it again, your client will NOT
39 * receive any missed messages on the topic. If you need to ensure receipt of
40 * missed messages, then you must use a consumer that's created with a group
41 * name and ID. (If you create multiple consumer processes using the same group,
42 * load is split across them. Be sure to use a different ID for each instance.)<br/>
48 public class DMaaPCambriaClientFactory {
50 * Create a consumer instance with the default timeout and no limit on
51 * messages returned. This consumer operates as an independent consumer
52 * (i.e., not in a group) and is NOT re-startable across sessions.
55 * A comma separated list of hosts to use to connect to Cambria.
56 * You can include port numbers (3904 is the default). For
57 * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
60 * The topic to consume
64 public static CambriaConsumer createConsumer(String hostList, String topic) {
65 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
70 * Create a consumer instance with the default timeout and no limit on
71 * messages returned. This consumer operates as an independent consumer
72 * (i.e., not in a group) and is NOT re-startable across sessions.
75 * The host used in the URL to Cambria. Entries can be
78 * The topic to consume
82 public static CambriaConsumer createConsumer(Collection<String> hostSet,
84 return createConsumer(hostSet, topic, null);
88 * Create a consumer instance with server-side filtering, the default
89 * timeout, and no limit on messages returned. This consumer operates as an
90 * independent consumer (i.e., not in a group) and is NOT re-startable
94 * The host used in the URL to Cambria. Entries can be
97 * The topic to consume
99 * a filter to use on the server side
103 public static CambriaConsumer createConsumer(Collection<String> hostSet,
104 String topic, String filter) {
105 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
106 "0", -1, -1, filter, null, null);
110 * Create a consumer instance with the default timeout, and no limit on
111 * messages returned. This consumer can operate in a logical group and is
112 * re-startable across sessions when you use the same group and ID on
116 * The host used in the URL to Cambria. Entries can be
119 * The topic to consume
120 * @param consumerGroup
121 * The name of the consumer group this consumer is part of
123 * The unique id of this consume in its group
127 public static CambriaConsumer createConsumer(Collection<String> hostSet,
128 final String topic, final String consumerGroup,
129 final String consumerId) {
130 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
134 * Create a consumer instance with the default timeout, and no limit on
135 * messages returned. This consumer can operate in a logical group and is
136 * re-startable across sessions when you use the same group and ID on
140 * The host used in the URL to Cambria. Entries can be
143 * The topic to consume
144 * @param consumerGroup
145 * The name of the consumer group this consumer is part of
147 * The unique id of this consume in its group
149 * The amount of time in milliseconds that the server should keep
150 * the connection open while waiting for message traffic. Use -1
151 * for default timeout.
153 * A limit on the number of messages returned in a single call.
154 * Use -1 for no limit.
158 public static CambriaConsumer createConsumer(Collection<String> hostSet,
159 final String topic, final String consumerGroup,
160 final String consumerId, int timeoutMs, int limit) {
161 return createConsumer(hostSet, topic, consumerGroup, consumerId,
162 timeoutMs, limit, null, null, null);
166 * Create a consumer instance with the default timeout, and no limit on
167 * messages returned. This consumer can operate in a logical group and is
168 * re-startable across sessions when you use the same group and ID on
169 * restart. This consumer also uses server-side filtering.
172 * A comma separated list of hosts to use to connect to Cambria.
173 * You can include port numbers (3904 is the default). For
174 * example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
176 * The topic to consume
177 * @param consumerGroup
178 * The name of the consumer group this consumer is part of
180 * The unique id of this consume in its group
182 * The amount of time in milliseconds that the server should keep
183 * the connection open while waiting for message traffic. Use -1
184 * for default timeout.
186 * A limit on the number of messages returned in a single call.
187 * Use -1 for no limit.
189 * A Highland Park filter expression using only built-in filter
190 * components. Use null for "no filter".
192 * key associated with a user
198 public static CambriaConsumer createConsumer(String hostList,
199 final String topic, final String consumerGroup,
200 final String consumerId, int timeoutMs, int limit, String filter,
201 String apiKey, String apiSecret) {
202 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
203 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
208 * Create a consumer instance with the default timeout, and no limit on
209 * messages returned. This consumer can operate in a logical group and is
210 * re-startable across sessions when you use the same group and ID on
211 * restart. This consumer also uses server-side filtering.
214 * The host used in the URL to Cambria. Entries can be
217 * The topic to consume
218 * @param consumerGroup
219 * The name of the consumer group this consumer is part of
221 * The unique id of this consume in its group
223 * The amount of time in milliseconds that the server should keep
224 * the connection open while waiting for message traffic. Use -1
225 * for default timeout.
227 * A limit on the number of messages returned in a single call.
228 * Use -1 for no limit.
230 * A Highland Park filter expression using only built-in filter
231 * components. Use null for "no filter".
233 * key associated with a user
238 public static CambriaConsumer createConsumer(Collection<String> hostSet,
239 final String topic, final String consumerGroup,
240 final String consumerId, int timeoutMs, int limit, String filter,
241 String apiKey, String apiSecret) {
245 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
246 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
247 } catch (MalformedURLException e) {
249 NotYetConnectedException exception=new NotYetConnectedException();
250 exception.setStackTrace(e.getStackTrace());
256 /*************************************************************************/
257 /*************************************************************************/
258 /*************************************************************************/
261 * Create a publisher that sends each message (or group of messages)
262 * immediately. Most applications should favor higher latency for much
263 * higher message throughput and the "simple publisher" is not a good
267 * The host used in the URL to Cambria. Can be "host:port", can
268 * be multiple comma-separated entries.
270 * The topic on which to publish messages.
271 * @return a publisher
273 public static CambriaBatchingPublisher createSimplePublisher(
274 String hostlist, String topic) {
275 return createBatchingPublisher(hostlist, topic, 1, 1);
279 * Create a publisher that batches messages. Be sure to close the publisher
280 * to send the last batch and ensure a clean shutdown. Message payloads are
284 * The host used in the URL to Cambria. Can be "host:port", can
285 * be multiple comma-separated entries.
287 * The topic on which to publish messages.
288 * @param maxBatchSize
289 * The largest set of messages to batch
291 * The maximum age of a message waiting in a batch
293 * @return a publisher
295 public static CambriaBatchingPublisher createBatchingPublisher(
296 String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
297 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
302 * Create a publisher that batches messages. Be sure to close the publisher
303 * to send the last batch and ensure a clean shutdown.
306 * The host used in the URL to Cambria. Can be "host:port", can
307 * be multiple comma-separated entries.
309 * The topic on which to publish messages.
310 * @param maxBatchSize
311 * The largest set of messages to batch
313 * The maximum age of a message waiting in a batch
315 * use gzip compression
317 * @return a publisher
319 public static CambriaBatchingPublisher createBatchingPublisher(
320 String hostlist, String topic, int maxBatchSize, long maxAgeMs,
322 return createBatchingPublisher(
323 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
324 maxBatchSize, maxAgeMs, compress);
328 * Create a publisher that batches messages. Be sure to close the publisher
329 * to send the last batch and ensure a clean shutdown.
332 * A set of hosts to be used in the URL to Cambria. Can be
333 * "host:port". Use multiple entries to enable failover.
335 * The topic on which to publish messages.
336 * @param maxBatchSize
337 * The largest set of messages to batch
339 * The maximum age of a message waiting in a batch
341 * use gzip compression
343 * @return a publisher
345 public static CambriaBatchingPublisher createBatchingPublisher(
346 String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
348 final TreeSet<String> hosts = new TreeSet<String>();
349 for (String hp : hostSet) {
352 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
357 * Create a publisher that batches messages. Be sure to close the publisher
358 * to send the last batch and ensure a clean shutdown.
361 * A set of hosts to be used in the URL to Cambria. Can be
362 * "host:port". Use multiple entries to enable failover.
364 * The topic on which to publish messages.
365 * @param maxBatchSize
366 * The largest set of messages to batch
368 * The maximum age of a message waiting in a batch
370 * use gzip compression
372 * @return a publisher
374 public static CambriaBatchingPublisher createBatchingPublisher(
375 Collection<String> hostSet, String topic, int maxBatchSize,
376 long maxAgeMs, boolean compress) {
377 return new DMaaPCambriaSimplerBatchPublisher.Builder()
378 .againstUrls(hostSet).onTopic(topic)
379 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
383 * Create an identity manager client to work with API keys.
386 * A set of hosts to be used in the URL to Cambria. Can be
387 * "host:port". Use multiple entries to enable failover.
392 * @return an identity manager
397 * Create a topic manager for working with topics.
400 * A set of hosts to be used in the URL to Cambria. Can be
401 * "host:port". Use multiple entries to enable failover.
406 * @return a topic manager
411 * Inject a consumer. Used to support unit tests.
415 public static void $testInject(CambriaConsumer cc) {
419 private static CambriaConsumer sfMock = null;