051380e1a98001c39f5736a228a99e8f16cf3092
[ccsdk/sli/adaptors.git] /
1 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;\r
2 \r
3 import java.io.UnsupportedEncodingException;\r
4 import java.net.URLEncoder;\r
5 import java.nio.charset.StandardCharsets;\r
6 import java.util.Properties;\r
7 \r
8 import org.slf4j.Logger;\r
9 import org.slf4j.LoggerFactory;\r
10 \r
11 public class ConsumerFactory {\r
12     private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);\r
13 \r
14     // Default values to minimize required configuration\r
15     private static final int DEFAULT_FETCH_PAUSE = 5000;\r
16     private static final int DEFAULT_CONNECT_TIMEOUT = 30000;\r
17     private static final int DEFAULT_READ_TIMEOUT = 180000;\r
18     private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request\r
19     private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;\r
20     private static final String DEFAULT_AUTH_METHOD = "basic";\r
21 \r
22     // Required properties\r
23     protected final String username;\r
24     protected final String password;\r
25     protected final String host;\r
26     private final String group;\r
27     private final String id;\r
28 \r
29     // Optional properties\r
30     protected Integer connectTimeout;\r
31     protected Integer readTimeout;\r
32     private Integer fetchPause;\r
33     private Integer limit;\r
34     private Integer timeoutQueryParamValue;\r
35     private String filter;\r
36     protected String auth;\r
37 \r
38     public String getAuth() {\r
39         return auth;\r
40     }\r
41 \r
42     public void setAuth(String auth) {\r
43         this.auth = auth;\r
44     }\r
45 \r
46     public Integer getConnectTimeout() {\r
47         return connectTimeout;\r
48     }\r
49 \r
50     public void setConnectTimeout(Integer connectTimeout) {\r
51         this.connectTimeout = connectTimeout;\r
52     }\r
53 \r
54     public Integer getReadTimeout() {\r
55         return readTimeout;\r
56     }\r
57 \r
58     public void setReadTimeout(Integer readTimeout) {\r
59         this.readTimeout = readTimeout;\r
60     }\r
61 \r
62     public Integer getFetchPause() {\r
63         return fetchPause;\r
64     }\r
65 \r
66     public void setFetchPause(Integer fetchPause) {\r
67         this.fetchPause = fetchPause;\r
68     }\r
69 \r
70     public Integer getLimit() {\r
71         return limit;\r
72     }\r
73 \r
74     public void setLimit(Integer limit) {\r
75         this.limit = limit;\r
76     }\r
77 \r
78     public Integer getTimeoutQueryParamValue() {\r
79         return timeoutQueryParamValue;\r
80     }\r
81 \r
82     public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {\r
83         this.timeoutQueryParamValue = timeoutQueryParamValue;\r
84     }\r
85 \r
86     public String getFilter() {\r
87         return filter;\r
88     }\r
89 \r
90     public void setFilter(String filter) {\r
91         processFilter(filter);\r
92     }\r
93 \r
94     public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {\r
95         this.username = username;\r
96         this.password = password;\r
97         this.host = host;\r
98         this.group = group;\r
99         this.id = id;\r
100         setDefaults();\r
101     }\r
102 \r
103     public ConsumerFactory(Properties properties) {\r
104         // Required properties\r
105         username = properties.getProperty("username");\r
106         password = properties.getProperty("password");\r
107         host = properties.getProperty("host");\r
108         auth = properties.getProperty("auth");\r
109         group = properties.getProperty("group");\r
110         id = properties.getProperty("id");\r
111 \r
112         // Optional properties\r
113         connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");\r
114         readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");\r
115         fetchPause = readOptionalInteger(properties, "fetchPause");\r
116         limit = readOptionalInteger(properties, "limit");\r
117         timeoutQueryParamValue = readOptionalInteger(properties, "timeout");\r
118         processFilter(properties.getProperty("filter"));\r
119 \r
120         setDefaults();\r
121     }\r
122 \r
123     private Integer readOptionalInteger(Properties properties, String propertyName) {\r
124         String stringValue = properties.getProperty(propertyName);\r
125         if (stringValue != null && stringValue.length() > 0) {\r
126             try {\r
127                 return Integer.valueOf(stringValue);\r
128             } catch (NumberFormatException e) {\r
129                 LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);\r
130             }\r
131         }\r
132         return null;\r
133     }\r
134 \r
135     public PollingConsumerImpl createPollingClient() {\r
136         return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);\r
137     }\r
138 \r
139     public PullingConsumerImpl createPullingClient() {\r
140         return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);\r
141     }\r
142 \r
143     private void processFilter(String filterString) {\r
144         if (filterString != null) {\r
145             if (filterString.length() > 0) {\r
146                 try {\r
147                     filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());\r
148                 } catch (UnsupportedEncodingException e) {\r
149                     LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);\r
150                     filter = null;\r
151                 }\r
152             } else {\r
153                 filter = null;\r
154             }\r
155         }\r
156     }\r
157 \r
158     private void setDefaults() {\r
159         if (connectTimeout == null) {\r
160             connectTimeout = DEFAULT_CONNECT_TIMEOUT;\r
161         }\r
162         if (readTimeout == null) {\r
163             readTimeout = DEFAULT_READ_TIMEOUT;\r
164         }\r
165         if (fetchPause == null) {\r
166             fetchPause = DEFAULT_FETCH_PAUSE;\r
167         }\r
168         if (limit == null) {\r
169             limit = DEFAULT_LIMIT;\r
170         }\r
171         if (timeoutQueryParamValue == null) {\r
172             timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;\r
173         }\r
174         if (auth == null) {\r
175             auth = DEFAULT_AUTH_METHOD;\r
176         }\r
177     }\r
178 \r
179 }\r