59e472c7af0e7152803b05402dfa6cbef32d8fe5
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / MRClientFactory.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client;
23
24 import java.io.File;
25 import java.io.FileNotFoundException;
26 import java.io.FileReader;
27 import java.io.FileWriter;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.util.Collection;
31 import java.util.Map;
32 import java.util.Properties;
33 import java.util.TreeSet;
34 import java.util.UUID;
35
36 import javax.ws.rs.core.MultivaluedMap;
37
38 import com.att.nsa.mr.client.impl.MRConsumerImpl;
39 import com.att.nsa.mr.client.impl.MRMetaClient;
40 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
42
43 /**
44  * A factory for MR clients.<br/>
45  * <br/>
46  * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
47  * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
48  * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
49  * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
50  * them. Be sure to use a different ID for each instance.)<br/>
51  * <br/>
52  * Publishers  
53  * 
54  * @author author
55  */
56 public class MRClientFactory
57 {
58         public static MultivaluedMap<String, Object> HTTPHeadersMap;
59         public static Map<String, String> DME2HeadersMap;
60         public static String routeFilePath;
61         
62         public static FileReader routeReader;
63         
64         public static FileWriter routeWriter= null;
65         public static Properties prop=null;
66         //routeReader= new FileReader(new File (routeFilePath));
67         //props= new Properties();
68         /**
69          * Create a consumer instance with the default timeout and no limit
70          * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
71          * across sessions.
72          * 
73          * @param hostList A comma separated list of hosts to use to connect to MR.
74          * You can include port numbers (3904 is the default). For example, "hostname:8080,"
75          * 
76          * @param topic The topic to consume
77          * 
78          * @return a consumer
79          */
80         public static MRConsumer createConsumer ( String hostList, String topic )
81         {
82                 return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
83         }
84
85         /**
86          * Create a consumer instance with the default timeout and no limit
87          * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
88          * across sessions.
89          * 
90          * @param hostSet The host used in the URL to MR. Entries can be "host:port".
91          * @param topic The topic to consume
92          * 
93          * @return a consumer
94          */
95         public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
96         {
97                 return createConsumer ( hostSet, topic, null );
98         }
99
100         /**
101          * Create a consumer instance with server-side filtering, the default timeout, and no limit
102          * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
103          * across sessions.
104          * 
105          * @param hostSet The host used in the URL to MR. Entries can be "host:port".
106          * @param topic The topic to consume
107          * @param filter a filter to use on the server side
108          * 
109          * @return a consumer
110          */
111         public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
112         {
113                 return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
114         }
115
116         /**
117          * Create a consumer instance with the default timeout, and no limit
118          * on messages returned. This consumer can operate in a logical group and is re-startable
119          * across sessions when you use the same group and ID on restart.
120          * 
121          * @param hostSet The host used in the URL to MR. Entries can be "host:port".
122          * @param topic The topic to consume
123          * @param consumerGroup The name of the consumer group this consumer is part of
124          * @param consumerId The unique id of this consume in its group
125          * 
126          * @return a consumer
127          */
128         public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
129         {
130                 return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
131         }
132
133         /**
134          * Create a consumer instance with the default timeout, and no limit
135          * on messages returned. This consumer can operate in a logical group and is re-startable
136          * across sessions when you use the same group and ID on restart.
137          * 
138          * @param hostSet The host used in the URL to MR. Entries can be "host:port".
139          * @param topic The topic to consume
140          * @param consumerGroup The name of the consumer group this consumer is part of
141          * @param consumerId The unique id of this consume in its group
142          * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
143          * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
144          * 
145          * @return a consumer
146          */
147         public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
148         {
149                 return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
150         }
151
152         /**
153          * Create a consumer instance with the default timeout, and no limit
154          * on messages returned. This consumer can operate in a logical group and is re-startable
155          * across sessions when you use the same group and ID on restart. This consumer also uses
156          * server-side filtering.
157          * 
158          * @param hostList A comma separated list of hosts to use to connect to MR.
159          * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
160          * @param topic The topic to consume
161          * @param consumerGroup The name of the consumer group this consumer is part of
162          * @param consumerId The unique id of this consume in its group
163          * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
164          * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
165          * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
166          * 
167          * @return a consumer
168          */
169         public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
170                 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
171         {
172                 return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
173                         consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
174         }
175
176         /**
177          * Create a consumer instance with the default timeout, and no limit
178          * on messages returned. This consumer can operate in a logical group and is re-startable
179          * across sessions when you use the same group and ID on restart. This consumer also uses
180          * server-side filtering.
181          * 
182          * @param hostSet The host used in the URL to MR. Entries can be "host:port".
183          * @param topic The topic to consume
184          * @param consumerGroup The name of the consumer group this consumer is part of
185          * @param consumerId The unique id of this consume in its group
186          * @param timeoutMs     The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
187          * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
188          * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
189          * 
190          * @return a consumer
191          */
192         public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
193                 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
194         {
195                 if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
196                 try {
197                         return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
198                 } catch (MalformedURLException e) {
199                         throw new RuntimeException(e);
200                 }
201         }
202
203         /*************************************************************************/
204         /*************************************************************************/
205         /*************************************************************************/
206         
207         /**
208          * Create a publisher that sends each message (or group of messages) immediately. Most
209          * applications should favor higher latency for much higher message throughput and the
210          * "simple publisher" is not a good choice. 
211          *  
212          * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
213          * @param topic The topic on which to publish messages.
214          * @return a publisher
215          */
216         public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
217         {
218                 return createBatchingPublisher ( hostlist, topic, 1, 1 );
219         }
220
221         /**
222          * Create a publisher that batches messages. Be sure to close the publisher to
223          * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
224          * 
225          * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
226          * @param topic The topic on which to publish messages.
227          * @param maxBatchSize The largest set of messages to batch
228          * @param maxAgeMs The maximum age of a message waiting in a batch
229          * 
230          * @return a publisher
231          */
232         public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
233         {
234                 return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
235         }
236
237         /**
238          * Create a publisher that batches messages. Be sure to close the publisher to
239          * send the last batch and ensure a clean shutdown. 
240          * 
241          * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
242          * @param topic The topic on which to publish messages.
243          * @param maxBatchSize The largest set of messages to batch
244          * @param maxAgeMs The maximum age of a message waiting in a batch
245          * @param compress use gzip compression
246          * 
247          * @return a publisher
248          */
249         public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
250         {
251                 return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
252         }
253
254         /**
255          * Create a publisher that batches messages. Be sure to close the publisher to
256          * send the last batch and ensure a clean shutdown. 
257          * 
258          * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
259          * @param topic The topic on which to publish messages.
260          * @param maxBatchSize The largest set of messages to batch
261          * @param maxAgeMs The maximum age of a message waiting in a batch
262          * @param compress use gzip compression
263          * 
264          * @return a publisher
265          */
266         public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
267         {
268                 final TreeSet<String> hosts = new TreeSet<String> ();
269                 for ( String hp : hostSet )
270                 {
271                         hosts.add ( hp );
272                 }
273                 return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
274         }
275
276         /**
277          * Create a publisher that batches messages. Be sure to close the publisher to
278          * send the last batch and ensure a clean shutdown. 
279          * 
280          * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
281          * @param topic The topic on which to publish messages.
282          * @param maxBatchSize The largest set of messages to batch
283          * @param maxAgeMs The maximum age of a message waiting in a batch
284          * @param compress use gzip compression
285          * 
286          * @return a publisher
287          */
288         public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
289         {
290                 return new MRSimplerBatchPublisher.Builder ().
291                         againstUrls ( hostSet ).
292                         onTopic ( topic ).
293                         batchTo ( maxBatchSize, maxAgeMs ).
294                         compress ( compress ).
295                         build ();
296         }
297         
298         /**
299          * Create a publisher that batches messages. Be sure to close the publisher to
300          * send the last batch and ensure a clean shutdown. 
301          * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
302          * @param topic The topic on which to publish messages.
303          * @param username username 
304          * @param password password
305          * @param maxBatchSize The largest set of messages to batch
306          * @param maxAgeMs The maximum age of a message waiting in a batch
307          * @param compress use gzip compression
308          * @param protocolFlag  http auth or ueb auth or dme2 method
309          * @param producerFilePath all properties for publisher
310          * @return MRBatchingPublisher obj
311          */
312         public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
313         {
314                 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
315                         againstUrls(MRConsumerImpl.stringToList(host)).
316                         onTopic ( topic ).
317                         batchTo ( maxBatchSize, maxAgeMs ).
318                         compress ( compress ).
319                         build ();
320                 
321                 pub.setHost(host);
322                 pub.setUsername(username);
323                 pub.setPassword(password);
324                 pub.setProtocolFlag(protocolFlag);
325                 pub.setProducerFilePath(producerFilePath);
326                 return pub;
327         }
328         
329         
330         /**
331          * Create a publisher that batches messages. Be sure to close the publisher to
332          * send the last batch and ensure a clean shutdown
333          * @param producerFilePath set all properties for publishing message
334          * @return MRBatchingPublisher obj
335          * @throws FileNotFoundException exc
336          * @throws IOException ioex
337          */
338         public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException    {
339                 FileReader reader = new FileReader(new File (producerFilePath));
340                 Properties props = new Properties();            
341                 props.load(reader);
342                 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
343                         againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
344                         onTopic ( props.getProperty("topic") ).
345                         batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
346                         compress (Boolean.parseBoolean(props.getProperty("compress"))).
347                         httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
348                         build ();               
349                 pub.setHost(props.getProperty("host"));
350                 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
351                                 
352                         pub.setAuthKey(props.getProperty("authKey"));
353                         pub.setAuthDate(props.getProperty("authDate"));
354                         pub.setUsername(props.getProperty("username"));
355                         pub.setPassword(props.getProperty("password"));
356                 }else{
357                         pub.setUsername(props.getProperty("username"));
358                         pub.setPassword(props.getProperty("password"));
359                 }
360                 pub.setProducerFilePath(producerFilePath);
361                 pub.setProtocolFlag(props.getProperty("TransportType"));
362                 pub.setProps(props);
363                 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
364                 routeReader= new FileReader(new File (routeFilePath));
365                 prop= new Properties();
366                 File fo= new File(routeFilePath);
367                 if(!fo.exists()){
368                         routeWriter=new FileWriter(new File(routeFilePath));
369                 }
370                 //pub.setContentType(contentType);
371                 return pub;
372         }
373         
374         /**
375          * Create a publisher that will contain send methods that return 
376          * response object to user. 
377          * @param producerFilePath set all properties for publishing message
378          * @return MRBatchingPublisher obj
379          * @throws FileNotFoundException exc
380          * @throws IOException ioex
381          */
382         public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException      {
383                 FileReader reader = new FileReader(new File (producerFilePath));
384                 Properties props = new Properties();            
385                 props.load(reader);
386                 MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
387                         againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
388                         onTopic ( props.getProperty("topic") ).
389                         batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
390                         compress (Boolean.parseBoolean(props.getProperty("compress"))).
391                         httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
392                         withResponse(withResponse).
393                         build ();               
394                 pub.setHost(props.getProperty("host"));
395                 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
396                                 
397                         pub.setAuthKey(props.getProperty("authKey"));
398                         pub.setAuthDate(props.getProperty("authDate"));
399                         pub.setUsername(props.getProperty("username"));
400                         pub.setPassword(props.getProperty("password"));
401                 }else{
402                         pub.setUsername(props.getProperty("username"));
403                         pub.setPassword(props.getProperty("password"));
404                 }
405                 pub.setProducerFilePath(producerFilePath);
406                 pub.setProtocolFlag(props.getProperty("TransportType"));
407                 pub.setProps(props);
408                 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
409                 routeReader= new FileReader(new File (routeFilePath));
410                 prop= new Properties();
411                 File fo= new File(routeFilePath);
412                 if(!fo.exists()){
413                         routeWriter=new FileWriter(new File(routeFilePath));
414                 }
415                 //pub.setContentType(contentType);
416                 return pub;
417         }
418         
419         
420         
421
422         
423         
424         
425         
426         
427
428         
429         /**
430          * Create an identity manager client to work with API keys.
431          * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
432          * @param apiKey Your API key
433          * @param apiSecret Your API secret
434          * @return an identity manager
435          */
436         public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
437         {
438                 MRIdentityManager cim;
439                 try {
440                         cim = new MRMetaClient ( hostSet );
441                 } catch (MalformedURLException e) {
442                         throw new RuntimeException(e);
443                 }
444                 cim.setApiCredentials ( apiKey, apiSecret );
445                 return cim;
446         }
447
448         /**
449          * Create a topic manager for working with topics.
450          * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
451          * @param apiKey Your API key
452          * @param apiSecret Your API secret
453          * @return a topic manager
454          */
455         public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
456         {
457                 MRMetaClient tmi;
458                 try {
459                         tmi = new MRMetaClient ( hostSet );
460                 } catch (MalformedURLException e) {
461                         throw new RuntimeException(e);
462                 }
463                 tmi.setApiCredentials ( apiKey, apiSecret );
464                 return tmi;
465         }
466
467         /**
468          * Inject a consumer. Used to support unit tests.
469          * @param cc
470          */
471         public static void $testInject ( MRConsumer cc )
472         {
473                 MRClientBuilders.sfConsumerMock = cc;
474         }
475
476         public static MRConsumer createConsumer(String host, String topic, String username,
477                         String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
478
479                 MRConsumerImpl sub;
480                 try {
481                         sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
482                 } catch (MalformedURLException e) {
483                         throw new RuntimeException(e);
484                 }
485                 sub.setUsername(username);
486                 sub.setPassword(password);
487                 sub.setHost(host);
488                 sub.setProtocolFlag(protocalFlag);
489                 sub.setConsumerFilePath(consumerFilePath);
490                 return sub;
491         
492         }
493         
494         public static MRConsumer createConsumer(String host, String topic, String username,
495                         String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
496
497                 MRConsumerImpl sub;
498                 try {
499                         sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
500                 } catch (MalformedURLException e) {
501                         throw new RuntimeException(e);
502                 }
503                 sub.setUsername(username);
504                 sub.setPassword(password);
505                 sub.setHost(host);
506                 sub.setProtocolFlag(protocalFlag);
507                 sub.setConsumerFilePath(consumerFilePath);
508                 return sub;
509         
510         }
511
512         public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
513                 FileReader reader = new FileReader(new File (consumerFilePath));
514                 Properties props = new Properties();            
515                 props.load(reader);
516                 int timeout;
517                 if(props.getProperty("timeout")!=null)
518                         timeout=Integer.parseInt(props.getProperty("timeout"));
519                 else
520                         timeout=-1;
521                 int limit;
522                 if(props.getProperty("limit")!=null)
523                         limit=Integer.parseInt(props.getProperty("limit"));
524                 else
525                         limit=-1;
526                 String group;
527                 if(props.getProperty("group")==null)
528                 group=UUID.randomUUID ().toString();
529                 else
530                         group=props.getProperty("group");
531                 MRConsumerImpl sub=null;
532                 if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
533                         sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout,  limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate")  );
534                         sub.setAuthKey(props.getProperty("authKey"));
535                         sub.setAuthDate(props.getProperty("authDate"));
536                         sub.setUsername(props.getProperty("username"));
537                         sub.setPassword(props.getProperty("password"));
538                 }else{
539                         sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password")  );
540                         sub.setUsername(props.getProperty("username"));
541                         sub.setPassword(props.getProperty("password"));
542                 }
543                 sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
544             sub.setProps(props);
545                 sub.setHost(props.getProperty("host"));
546                 sub.setProtocolFlag(props.getProperty("TransportType"));
547                 //sub.setConsumerFilePath(consumerFilePath);
548                 sub.setfFilter(props.getProperty("filter"));
549                 routeFilePath=props.getProperty("DME2preferredRouterFilePath");
550                 routeReader= new FileReader(new File (routeFilePath));
551                 prop= new Properties();
552                 File fo= new File(routeFilePath);
553                 if(!fo.exists()){
554                                 routeWriter=new FileWriter(new File(routeFilePath));
555                 }
556                 return sub;
557         }
558 }