1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher;
24 import java.net.MalformedURLException;
25 import java.nio.channels.NotYetConnectedException;
26 import java.util.Collection;
27 import java.util.TreeSet;
28 import java.util.UUID;
30 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaConsumerImpl;
31 import org.onap.dmaap.dmf.mr.metrics.publisher.impl.DMaaPCambriaSimplerBatchPublisher;
34 * A factory for Cambria clients.<br/>
36 * Use caution selecting a consumer creator factory. If the call doesn't accept
37 * a consumer group name, then it creates a consumer that is not restartable.
38 * That is, if you stop your process and start it again, your client will NOT
39 * receive any missed messages on the topic. If you need to ensure receipt of
40 * missed messages, then you must use a consumer that's created with a group
41 * name and ID. (If you create multiple consumer processes using the same group,
42 * load is split across them. Be sure to use a different ID for each instance.)<br/>
48 public class DMaaPCambriaClientFactory {
50 * Create a consumer instance with the default timeout and no limit on
51 * messages returned. This consumer operates as an independent consumer
52 * (i.e., not in a group) and is NOT re-startable across sessions.
55 * A comma separated list of hosts to use to connect to Cambria.
56 * You can include port numbers (3904 is the default). For
59 * The topic to consume
63 public static CambriaConsumer createConsumer(String hostList, String topic) {
64 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
69 * Create a consumer instance with the default timeout and no limit on
70 * messages returned. This consumer operates as an independent consumer
71 * (i.e., not in a group) and is NOT re-startable across sessions.
74 * The host used in the URL to Cambria. Entries can be
77 * The topic to consume
81 public static CambriaConsumer createConsumer(Collection<String> hostSet,
83 return createConsumer(hostSet, topic, null);
87 * Create a consumer instance with server-side filtering, the default
88 * timeout, and no limit on messages returned. This consumer operates as an
89 * independent consumer (i.e., not in a group) and is NOT re-startable
93 * The host used in the URL to Cambria. Entries can be
96 * The topic to consume
98 * a filter to use on the server side
102 public static CambriaConsumer createConsumer(Collection<String> hostSet,
103 String topic, String filter) {
104 return createConsumer(hostSet, topic, UUID.randomUUID().toString(),
105 "0", -1, -1, filter, null, null);
109 * Create a consumer instance with the default timeout, and no limit on
110 * messages returned. This consumer can operate in a logical group and is
111 * re-startable across sessions when you use the same group and ID on
115 * The host used in the URL to Cambria. Entries can be
118 * The topic to consume
119 * @param consumerGroup
120 * The name of the consumer group this consumer is part of
122 * The unique id of this consume in its group
126 public static CambriaConsumer createConsumer(Collection<String> hostSet,
127 final String topic, final String consumerGroup,
128 final String consumerId) {
129 return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
133 * Create a consumer instance with the default timeout, and no limit on
134 * messages returned. This consumer can operate in a logical group and is
135 * re-startable across sessions when you use the same group and ID on
139 * The host used in the URL to Cambria. Entries can be
142 * The topic to consume
143 * @param consumerGroup
144 * The name of the consumer group this consumer is part of
146 * The unique id of this consume in its group
148 * The amount of time in milliseconds that the server should keep
149 * the connection open while waiting for message traffic. Use -1
150 * for default timeout.
152 * A limit on the number of messages returned in a single call.
153 * Use -1 for no limit.
157 public static CambriaConsumer createConsumer(Collection<String> hostSet,
158 final String topic, final String consumerGroup,
159 final String consumerId, int timeoutMs, int limit) {
160 return createConsumer(hostSet, topic, consumerGroup, consumerId,
161 timeoutMs, limit, null, null, null);
165 * Create a consumer instance with the default timeout, and no limit on
166 * messages returned. This consumer can operate in a logical group and is
167 * re-startable across sessions when you use the same group and ID on
168 * restart. This consumer also uses server-side filtering.
171 * A comma separated list of hosts to use to connect to Cambria.
172 * You can include port numbers (3904 is the default). For
174 * The topic to consume
175 * @param consumerGroup
176 * The name of the consumer group this consumer is part of
178 * The unique id of this consume in its group
180 * The amount of time in milliseconds that the server should keep
181 * the connection open while waiting for message traffic. Use -1
182 * for default timeout.
184 * A limit on the number of messages returned in a single call.
185 * Use -1 for no limit.
187 * A Highland Park filter expression using only built-in filter
188 * components. Use null for "no filter".
190 * key associated with a user
196 public static CambriaConsumer createConsumer(String hostList,
197 final String topic, final String consumerGroup,
198 final String consumerId, int timeoutMs, int limit, String filter,
199 String apiKey, String apiSecret) {
200 return createConsumer(DMaaPCambriaConsumerImpl.stringToList(hostList),
201 topic, consumerGroup, consumerId, timeoutMs, limit, filter,
206 * Create a consumer instance with the default timeout, and no limit on
207 * messages returned. This consumer can operate in a logical group and is
208 * re-startable across sessions when you use the same group and ID on
209 * restart. This consumer also uses server-side filtering.
212 * The host used in the URL to Cambria. Entries can be
215 * The topic to consume
216 * @param consumerGroup
217 * The name of the consumer group this consumer is part of
219 * The unique id of this consume in its group
221 * The amount of time in milliseconds that the server should keep
222 * the connection open while waiting for message traffic. Use -1
223 * for default timeout.
225 * A limit on the number of messages returned in a single call.
226 * Use -1 for no limit.
228 * A Highland Park filter expression using only built-in filter
229 * components. Use null for "no filter".
231 * key associated with a user
236 public static CambriaConsumer createConsumer(Collection<String> hostSet,
237 final String topic, final String consumerGroup,
238 final String consumerId, int timeoutMs, int limit, String filter,
239 String apiKey, String apiSecret) {
243 return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
244 consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
245 } catch (MalformedURLException e) {
247 NotYetConnectedException exception=new NotYetConnectedException();
248 exception.setStackTrace(e.getStackTrace());
254 /*************************************************************************/
255 /*************************************************************************/
256 /*************************************************************************/
259 * Create a publisher that sends each message (or group of messages)
260 * immediately. Most applications should favor higher latency for much
261 * higher message throughput and the "simple publisher" is not a good
265 * The host used in the URL to Cambria. Can be "host:port", can
266 * be multiple comma-separated entries.
268 * The topic on which to publish messages.
269 * @return a publisher
271 public static CambriaBatchingPublisher createSimplePublisher(
272 String hostlist, String topic) {
273 return createBatchingPublisher(hostlist, topic, 1, 1);
277 * Create a publisher that batches messages. Be sure to close the publisher
278 * to send the last batch and ensure a clean shutdown. Message payloads are
282 * The host used in the URL to Cambria. Can be "host:port", can
283 * be multiple comma-separated entries.
285 * The topic on which to publish messages.
286 * @param maxBatchSize
287 * The largest set of messages to batch
289 * The maximum age of a message waiting in a batch
291 * @return a publisher
293 public static CambriaBatchingPublisher createBatchingPublisher(
294 String hostlist, String topic, int maxBatchSize, long maxAgeMs) {
295 return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs,
300 * Create a publisher that batches messages. Be sure to close the publisher
301 * to send the last batch and ensure a clean shutdown.
304 * The host used in the URL to Cambria. Can be "host:port", can
305 * be multiple comma-separated entries.
307 * The topic on which to publish messages.
308 * @param maxBatchSize
309 * The largest set of messages to batch
311 * The maximum age of a message waiting in a batch
313 * use gzip compression
315 * @return a publisher
317 public static CambriaBatchingPublisher createBatchingPublisher(
318 String hostlist, String topic, int maxBatchSize, long maxAgeMs,
320 return createBatchingPublisher(
321 DMaaPCambriaConsumerImpl.stringToList(hostlist), topic,
322 maxBatchSize, maxAgeMs, compress);
326 * Create a publisher that batches messages. Be sure to close the publisher
327 * to send the last batch and ensure a clean shutdown.
330 * A set of hosts to be used in the URL to Cambria. Can be
331 * "host:port". Use multiple entries to enable failover.
333 * The topic on which to publish messages.
334 * @param maxBatchSize
335 * The largest set of messages to batch
337 * The maximum age of a message waiting in a batch
339 * use gzip compression
341 * @return a publisher
343 public static CambriaBatchingPublisher createBatchingPublisher(
344 String[] hostSet, String topic, int maxBatchSize, long maxAgeMs,
346 final TreeSet<String> hosts = new TreeSet<String>();
347 for (String hp : hostSet) {
350 return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs,
355 * Create a publisher that batches messages. Be sure to close the publisher
356 * to send the last batch and ensure a clean shutdown.
359 * A set of hosts to be used in the URL to Cambria. Can be
360 * "host:port". Use multiple entries to enable failover.
362 * The topic on which to publish messages.
363 * @param maxBatchSize
364 * The largest set of messages to batch
366 * The maximum age of a message waiting in a batch
368 * use gzip compression
370 * @return a publisher
372 public static CambriaBatchingPublisher createBatchingPublisher(
373 Collection<String> hostSet, String topic, int maxBatchSize,
374 long maxAgeMs, boolean compress) {
375 return new DMaaPCambriaSimplerBatchPublisher.Builder()
376 .againstUrls(hostSet).onTopic(topic)
377 .batchTo(maxBatchSize, maxAgeMs).compress(compress).build();
381 * Create an identity manager client to work with API keys.
384 * A set of hosts to be used in the URL to Cambria. Can be
385 * "host:port". Use multiple entries to enable failover.
390 * @return an identity manager
395 * Create a topic manager for working with topics.
398 * A set of hosts to be used in the URL to Cambria. Can be
399 * "host:port". Use multiple entries to enable failover.
404 * @return a topic manager
409 * Inject a consumer. Used to support unit tests.
413 public static void $testInject(CambriaConsumer cc) {
417 private static CambriaConsumer sfMock = null;