0802bec2f3fd57b074470d64201d7ee560366f16
[ccsdk/sli.git] /
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * openECOMP : SDN-C\r
4  * ================================================================================\r
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights\r
6  *                      reserved.\r
7  *                      \r
8  * Modifications Copyright (C) 2019 IBM.\r
9  * ================================================================================\r
10  * Licensed under the Apache License, Version 2.0 (the "License");\r
11  * you may not use this file except in compliance with the License.\r
12  * You may obtain a copy of the License at\r
13  *\r
14  *      http://www.apache.org/licenses/LICENSE-2.0\r
15  *\r
16  * Unless required by applicable law or agreed to in writing, software\r
17  * distributed under the License is distributed on an "AS IS" BASIS,\r
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
19  * See the License for the specific language governing permissions and\r
20  * limitations under the License.\r
21  * ============LICENSE_END=========================================================\r
22  */\r
23 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;\r
24 \r
25 import java.io.UnsupportedEncodingException;\r
26 import java.net.URLEncoder;\r
27 import java.nio.charset.StandardCharsets;\r
28 import java.util.Properties;\r
29 \r
30 import org.slf4j.Logger;\r
31 import org.slf4j.LoggerFactory;\r
32 \r
33 public class ConsumerFactory {\r
34     private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);\r
35 \r
36     // Default values to minimize required configuration\r
37     private static final int DEFAULT_FETCH_PAUSE = 5000;\r
38     private static final int DEFAULT_CONNECT_TIMEOUT = 30000;\r
39     private static final int DEFAULT_READ_TIMEOUT = 180000;\r
40     private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request\r
41     private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;\r
42     private static final String DEFAULT_AUTH_METHOD = "basic";\r
43 \r
44     // Required properties\r
45     protected final String username;\r
46     protected final String password;\r
47     protected final String host;\r
48     private final String group;\r
49     private final String id;\r
50 \r
51     // Optional properties\r
52     protected Integer connectTimeout;\r
53     protected Integer readTimeout;\r
54     private Integer fetchPause;\r
55     private Integer limit;\r
56     private Integer timeoutQueryParamValue;\r
57     private String filter;\r
58     protected String auth;\r
59     \r
60     public ConsumerFactory(Properties properties) {\r
61         // Required properties\r
62         username = properties.getProperty("username");\r
63         password = properties.getProperty("password");\r
64         host = properties.getProperty("host");\r
65         auth = properties.getProperty("auth");\r
66         group = properties.getProperty("group");\r
67         id = properties.getProperty("id");\r
68 \r
69         // Optional properties\r
70         connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");\r
71         readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");\r
72         fetchPause = readOptionalInteger(properties, "fetchPause");\r
73         limit = readOptionalInteger(properties, "limit");\r
74         timeoutQueryParamValue = readOptionalInteger(properties, "timeout");\r
75         processFilter(properties.getProperty("filter"));\r
76 \r
77         setDefaults();\r
78         }\r
79     \r
80     public String getAuth() {\r
81         return auth;\r
82     }\r
83 \r
84     public void setAuth(String auth) {\r
85         this.auth = auth;\r
86     }\r
87 \r
88     public Integer getConnectTimeout() {\r
89         return connectTimeout;\r
90     }\r
91 \r
92     public void setConnectTimeout(Integer connectTimeout) {\r
93         this.connectTimeout = connectTimeout;\r
94     }\r
95 \r
96     public Integer getReadTimeout() {\r
97         return readTimeout;\r
98     }\r
99 \r
100     public void setReadTimeout(Integer readTimeout) {\r
101         this.readTimeout = readTimeout;\r
102     }\r
103 \r
104     public Integer getFetchPause() {\r
105         return fetchPause;\r
106     }\r
107 \r
108     public void setFetchPause(Integer fetchPause) {\r
109         this.fetchPause = fetchPause;\r
110     }\r
111 \r
112     public Integer getLimit() {\r
113         return limit;\r
114     }\r
115 \r
116     public void setLimit(Integer limit) {\r
117         this.limit = limit;\r
118     }\r
119 \r
120     public Integer getTimeoutQueryParamValue() {\r
121         return timeoutQueryParamValue;\r
122     }\r
123 \r
124     public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {\r
125         this.timeoutQueryParamValue = timeoutQueryParamValue;\r
126     }\r
127 \r
128     public String getFilter() {\r
129         return filter;\r
130     }\r
131 \r
132     public void setFilter(String filter) {\r
133         processFilter(filter);\r
134     }\r
135 \r
136     public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {\r
137         this.username = username;\r
138         this.password = password;\r
139         this.host = host;\r
140         this.group = group;\r
141         this.id = id;\r
142         setDefaults();\r
143     }\r
144 \r
145     private Integer readOptionalInteger(Properties properties, String propertyName) {\r
146         String stringValue = properties.getProperty(propertyName);\r
147         if (stringValue != null && stringValue.length() > 0) {\r
148             try {\r
149                 return Integer.valueOf(stringValue);\r
150             } catch (NumberFormatException e) {\r
151                 LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);\r
152             }\r
153         }\r
154         return null;\r
155     }\r
156 \r
157     public PollingConsumerImpl createPollingClient() {\r
158         return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);\r
159     }\r
160 \r
161     public PullingConsumerImpl createPullingClient() {\r
162         return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);\r
163     }\r
164 \r
165     private void processFilter(String filterString) {\r
166         if (filterString != null) {\r
167             if (filterString.length() > 0) {\r
168                 try {\r
169                     filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());\r
170                 } catch (UnsupportedEncodingException e) {\r
171                     LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);\r
172                     filter = null;\r
173                 }\r
174             } else {\r
175                 filter = null;\r
176             }\r
177         }\r
178     }\r
179 \r
180     private void setDefaults() {\r
181         if (connectTimeout == null) {\r
182             connectTimeout = DEFAULT_CONNECT_TIMEOUT;\r
183         }\r
184         if (readTimeout == null) {\r
185             readTimeout = DEFAULT_READ_TIMEOUT;\r
186         }\r
187         if (fetchPause == null) {\r
188             fetchPause = DEFAULT_FETCH_PAUSE;\r
189         }\r
190         if (limit == null) {\r
191             limit = DEFAULT_LIMIT;\r
192         }\r
193         if (timeoutQueryParamValue == null) {\r
194             timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;\r
195         }\r
196         if (auth == null) {\r
197             auth = DEFAULT_AUTH_METHOD;\r
198         }\r
199     }\r
200 \r
201 }\r