[POLICY-73] replace openecomp for policy-engine
[policy/engine.git] / PolicyEngineUtils / src / main / java / org / onap / policy / utils / BusConsumer.java
1 package org.onap.policy.utils;
2
3 import java.util.List;
4 import java.util.Properties;
5
6 import com.att.nsa.mr.client.impl.MRConsumerImpl;
7 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
8
9 public interface BusConsumer {
10         
11         /**
12          * fetch messages
13          * 
14          * @return list of messages
15          * @throws Exception when error encountered by underlying libraries
16          */
17         public Iterable<String> fetch() throws Exception;
18         
19         /**
20          * close underlying library consumer
21          */
22         public void close();
23         
24         /**
25          * MR based consumer
26          */
27         public static class DmaapConsumerWrapper implements BusConsumer {
28                 
29                 /**
30                  * MR Consumer
31                  */
32                 protected MRConsumerImpl consumer;
33                 
34                 /**
35                  * MR Consumer Wrapper
36                  * 
37                  * @param servers messaging bus hosts
38                  * @param topic topic
39                  * @param apiKey API Key
40                  * @param apiSecret API Secret
41                  * @param aafLogin AAF Login
42                  * @param aafPassword AAF Password
43                  * @param consumerGroup Consumer Group
44                  * @param consumerInstance Consumer Instance
45                  * @param fetchTimeout Fetch Timeout
46                  * @param fetchLimit Fetch Limit
47                  */
48                 public DmaapConsumerWrapper(List<String> servers, String topic, 
49                                                                 String aafLogin, String aafPassword,
50                                                                 String consumerGroup, String consumerInstance,
51                                                                 int fetchTimeout, int fetchLimit) 
52                 throws Exception {
53                                         
54                         this.consumer = new MRConsumerImpl(servers, topic, 
55                                                                                            consumerGroup, consumerInstance, 
56                                                                                            fetchTimeout, fetchLimit, 
57                                                                                    null, aafLogin, aafPassword);
58                         
59                         this.consumer.setUsername(aafLogin);
60                         this.consumer.setPassword(aafPassword);
61                         
62                         this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
63                         
64                         Properties props = new Properties();
65                         props.setProperty("Protocol", "http");
66                         this.consumer.setProps(props);
67                         this.consumer.setHost(servers.get(0) + ":3904");
68                 }
69                 
70                 /**
71                  * {@inheritDoc}
72                  */
73                 public Iterable<String> fetch() throws Exception {
74                         return this.consumer.fetch();
75                 }
76                 
77                 /**
78                  * {@inheritDoc}
79                  */
80                 public void close() {
81                         this.consumer.close();
82                 }
83                 
84                 @Override
85                 public String toString() {
86                         StringBuilder builder = new StringBuilder();
87                         builder.
88                         append("DmaapConsumerWrapper [").
89                         append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
90                         append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
91                         append(", consumer.getHost()=").append(consumer.getHost()).
92                         append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
93                         append(", consumer.getUsername()=").append(consumer.getUsername()).
94                         append("]");
95                         return builder.toString();
96                 }
97         }
98
99 }