1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
36 import javax.ws.rs.core.MultivaluedMap;
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
44 * A factory for MR clients.<br/>
46 * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
47 * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
48 * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
49 * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
50 * them. Be sure to use a different ID for each instance.)<br/>
56 public class MRClientFactory
58 public static MultivaluedMap<String, Object> HTTPHeadersMap;
59 public static Map<String, String> DME2HeadersMap;
60 public static String routeFilePath;
62 public static FileReader routeReader;
64 public static FileWriter routeWriter= null;
65 public static Properties prop=null;
66 //routeReader= new FileReader(new File (routeFilePath));
67 //props= new Properties();
69 * Create a consumer instance with the default timeout and no limit
70 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
73 * @param hostList A comma separated list of hosts to use to connect to MR.
74 * You can include port numbers (3904 is the default). For example, "hostname:8080,"
76 * @param topic The topic to consume
80 public static MRConsumer createConsumer ( String hostList, String topic )
82 return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
86 * Create a consumer instance with the default timeout and no limit
87 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
90 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
91 * @param topic The topic to consume
95 public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
97 return createConsumer ( hostSet, topic, null );
101 * Create a consumer instance with server-side filtering, the default timeout, and no limit
102 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
105 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
106 * @param topic The topic to consume
107 * @param filter a filter to use on the server side
111 public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
113 return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
117 * Create a consumer instance with the default timeout, and no limit
118 * on messages returned. This consumer can operate in a logical group and is re-startable
119 * across sessions when you use the same group and ID on restart.
121 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
122 * @param topic The topic to consume
123 * @param consumerGroup The name of the consumer group this consumer is part of
124 * @param consumerId The unique id of this consume in its group
128 public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
130 return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
134 * Create a consumer instance with the default timeout, and no limit
135 * on messages returned. This consumer can operate in a logical group and is re-startable
136 * across sessions when you use the same group and ID on restart.
138 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
139 * @param topic The topic to consume
140 * @param consumerGroup The name of the consumer group this consumer is part of
141 * @param consumerId The unique id of this consume in its group
142 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
143 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
147 public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
149 return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
153 * Create a consumer instance with the default timeout, and no limit
154 * on messages returned. This consumer can operate in a logical group and is re-startable
155 * across sessions when you use the same group and ID on restart. This consumer also uses
156 * server-side filtering.
158 * @param hostList A comma separated list of hosts to use to connect to MR.
159 * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
160 * @param topic The topic to consume
161 * @param consumerGroup The name of the consumer group this consumer is part of
162 * @param consumerId The unique id of this consume in its group
163 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
164 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
165 * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
169 public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
170 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
172 return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
173 consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
177 * Create a consumer instance with the default timeout, and no limit
178 * on messages returned. This consumer can operate in a logical group and is re-startable
179 * across sessions when you use the same group and ID on restart. This consumer also uses
180 * server-side filtering.
182 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
183 * @param topic The topic to consume
184 * @param consumerGroup The name of the consumer group this consumer is part of
185 * @param consumerId The unique id of this consume in its group
186 * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
187 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
188 * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
192 public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
193 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
195 if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
197 return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
198 } catch (MalformedURLException e) {
199 throw new RuntimeException(e);
203 /*************************************************************************/
204 /*************************************************************************/
205 /*************************************************************************/
208 * Create a publisher that sends each message (or group of messages) immediately. Most
209 * applications should favor higher latency for much higher message throughput and the
210 * "simple publisher" is not a good choice.
212 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
213 * @param topic The topic on which to publish messages.
214 * @return a publisher
216 public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
218 return createBatchingPublisher ( hostlist, topic, 1, 1 );
222 * Create a publisher that batches messages. Be sure to close the publisher to
223 * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
225 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
226 * @param topic The topic on which to publish messages.
227 * @param maxBatchSize The largest set of messages to batch
228 * @param maxAgeMs The maximum age of a message waiting in a batch
230 * @return a publisher
232 public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
234 return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
238 * Create a publisher that batches messages. Be sure to close the publisher to
239 * send the last batch and ensure a clean shutdown.
241 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
242 * @param topic The topic on which to publish messages.
243 * @param maxBatchSize The largest set of messages to batch
244 * @param maxAgeMs The maximum age of a message waiting in a batch
245 * @param compress use gzip compression
247 * @return a publisher
249 public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
251 return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
255 * Create a publisher that batches messages. Be sure to close the publisher to
256 * send the last batch and ensure a clean shutdown.
258 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
259 * @param topic The topic on which to publish messages.
260 * @param maxBatchSize The largest set of messages to batch
261 * @param maxAgeMs The maximum age of a message waiting in a batch
262 * @param compress use gzip compression
264 * @return a publisher
266 public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
268 final TreeSet<String> hosts = new TreeSet<String> ();
269 for ( String hp : hostSet )
273 return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
277 * Create a publisher that batches messages. Be sure to close the publisher to
278 * send the last batch and ensure a clean shutdown.
280 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
281 * @param topic The topic on which to publish messages.
282 * @param maxBatchSize The largest set of messages to batch
283 * @param maxAgeMs The maximum age of a message waiting in a batch
284 * @param compress use gzip compression
286 * @return a publisher
288 public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
290 return new MRSimplerBatchPublisher.Builder ().
291 againstUrls ( hostSet ).
293 batchTo ( maxBatchSize, maxAgeMs ).
294 compress ( compress ).
299 * Create a publisher that batches messages. Be sure to close the publisher to
300 * send the last batch and ensure a clean shutdown.
301 * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
302 * @param topic The topic on which to publish messages.
303 * @param username username
304 * @param password password
305 * @param maxBatchSize The largest set of messages to batch
306 * @param maxAgeMs The maximum age of a message waiting in a batch
307 * @param compress use gzip compression
308 * @param protocolFlag http auth or ueb auth or dme2 method
309 * @param producerFilePath all properties for publisher
310 * @return MRBatchingPublisher obj
312 public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
314 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
315 againstUrls(MRConsumerImpl.stringToList(host)).
317 batchTo ( maxBatchSize, maxAgeMs ).
318 compress ( compress ).
322 pub.setUsername(username);
323 pub.setPassword(password);
324 pub.setProtocolFlag(protocolFlag);
325 pub.setProducerFilePath(producerFilePath);
331 * Create a publisher that batches messages. Be sure to close the publisher to
332 * send the last batch and ensure a clean shutdown
333 * @param producerFilePath set all properties for publishing message
334 * @return MRBatchingPublisher obj
335 * @throws FileNotFoundException exc
336 * @throws IOException ioex
338 public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException {
339 FileReader reader = new FileReader(new File (producerFilePath));
340 Properties props = new Properties();
342 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
343 againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
344 onTopic ( props.getProperty("topic") ).
345 batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
346 compress (Boolean.parseBoolean(props.getProperty("compress"))).
347 httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
349 pub.setHost(props.getProperty("host"));
350 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
352 pub.setAuthKey(props.getProperty("authKey"));
353 pub.setAuthDate(props.getProperty("authDate"));
354 pub.setUsername(props.getProperty("username"));
355 pub.setPassword(props.getProperty("password"));
357 pub.setUsername(props.getProperty("username"));
358 pub.setPassword(props.getProperty("password"));
360 pub.setProducerFilePath(producerFilePath);
361 pub.setProtocolFlag(props.getProperty("TransportType"));
363 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
364 routeReader= new FileReader(new File (routeFilePath));
365 prop= new Properties();
366 File fo= new File(routeFilePath);
368 routeWriter=new FileWriter(new File(routeFilePath));
370 //pub.setContentType(contentType);
375 * Create a publisher that will contain send methods that return
376 * response object to user.
377 * @param producerFilePath set all properties for publishing message
378 * @return MRBatchingPublisher obj
379 * @throws FileNotFoundException exc
380 * @throws IOException ioex
382 public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException {
383 FileReader reader = new FileReader(new File (producerFilePath));
384 Properties props = new Properties();
386 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
387 againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
388 onTopic ( props.getProperty("topic") ).
389 batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
390 compress (Boolean.parseBoolean(props.getProperty("compress"))).
391 httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
392 withResponse(withResponse).
394 pub.setHost(props.getProperty("host"));
395 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
397 pub.setAuthKey(props.getProperty("authKey"));
398 pub.setAuthDate(props.getProperty("authDate"));
399 pub.setUsername(props.getProperty("username"));
400 pub.setPassword(props.getProperty("password"));
402 pub.setUsername(props.getProperty("username"));
403 pub.setPassword(props.getProperty("password"));
405 pub.setProducerFilePath(producerFilePath);
406 pub.setProtocolFlag(props.getProperty("TransportType"));
408 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
409 routeReader= new FileReader(new File (routeFilePath));
410 prop= new Properties();
411 File fo= new File(routeFilePath);
413 routeWriter=new FileWriter(new File(routeFilePath));
415 //pub.setContentType(contentType);
430 * Create an identity manager client to work with API keys.
431 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
432 * @param apiKey Your API key
433 * @param apiSecret Your API secret
434 * @return an identity manager
436 public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
438 MRIdentityManager cim;
440 cim = new MRMetaClient ( hostSet );
441 } catch (MalformedURLException e) {
442 throw new RuntimeException(e);
444 cim.setApiCredentials ( apiKey, apiSecret );
449 * Create a topic manager for working with topics.
450 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
451 * @param apiKey Your API key
452 * @param apiSecret Your API secret
453 * @return a topic manager
455 public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
459 tmi = new MRMetaClient ( hostSet );
460 } catch (MalformedURLException e) {
461 throw new RuntimeException(e);
463 tmi.setApiCredentials ( apiKey, apiSecret );
468 * Inject a consumer. Used to support unit tests.
471 public static void $testInject ( MRConsumer cc )
473 MRClientBuilders.sfConsumerMock = cc;
476 public static MRConsumer createConsumer(String host, String topic, String username,
477 String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
481 sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
482 } catch (MalformedURLException e) {
483 throw new RuntimeException(e);
485 sub.setUsername(username);
486 sub.setPassword(password);
488 sub.setProtocolFlag(protocalFlag);
489 sub.setConsumerFilePath(consumerFilePath);
494 public static MRConsumer createConsumer(String host, String topic, String username,
495 String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
499 sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
500 } catch (MalformedURLException e) {
501 throw new RuntimeException(e);
503 sub.setUsername(username);
504 sub.setPassword(password);
506 sub.setProtocolFlag(protocalFlag);
507 sub.setConsumerFilePath(consumerFilePath);
512 public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
513 FileReader reader = new FileReader(new File (consumerFilePath));
514 Properties props = new Properties();
517 if(props.getProperty("timeout")!=null)
518 timeout=Integer.parseInt(props.getProperty("timeout"));
522 if(props.getProperty("limit")!=null)
523 limit=Integer.parseInt(props.getProperty("limit"));
527 if(props.getProperty("group")==null)
528 group=UUID.randomUUID ().toString();
530 group=props.getProperty("group");
531 MRConsumerImpl sub=null;
532 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
533 sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") );
534 sub.setAuthKey(props.getProperty("authKey"));
535 sub.setAuthDate(props.getProperty("authDate"));
536 sub.setUsername(props.getProperty("username"));
537 sub.setPassword(props.getProperty("password"));
539 sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") );
540 sub.setUsername(props.getProperty("username"));
541 sub.setPassword(props.getProperty("password"));
543 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
545 sub.setHost(props.getProperty("host"));
546 sub.setProtocolFlag(props.getProperty("TransportType"));
547 //sub.setConsumerFilePath(consumerFilePath);
548 sub.setfFilter(props.getProperty("filter"));
549 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
550 routeReader= new FileReader(new File (routeFilePath));
551 prop= new Properties();
552 File fo= new File(routeFilePath);
554 routeWriter=new FileWriter(new File(routeFilePath));