1aa02c70a12f5d0a98c37cc03213c98305bf3c60
[ccsdk/sli.git] /
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * openECOMP : SDN-C\r
4  * ================================================================================\r
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights\r
6  *                      reserved.\r
7  *                      \r
8  * Modifications Copyright (C) 2019 IBM.\r
9  * ================================================================================\r
10  * Licensed under the Apache License, Version 2.0 (the "License");\r
11  * you may not use this file except in compliance with the License.\r
12  * You may obtain a copy of the License at\r
13  *\r
14  *      http://www.apache.org/licenses/LICENSE-2.0\r
15  *\r
16  * Unless required by applicable law or agreed to in writing, software\r
17  * distributed under the License is distributed on an "AS IS" BASIS,\r
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
19  * See the License for the specific language governing permissions and\r
20  * limitations under the License.\r
21  * ============LICENSE_END=========================================================\r
22  */\r
23 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;\r
24 \r
25 import java.io.UnsupportedEncodingException;\r
26 import java.net.URLEncoder;\r
27 import java.nio.charset.StandardCharsets;\r
28 import java.util.Properties;\r
29 \r
30 import org.slf4j.Logger;\r
31 import org.slf4j.LoggerFactory;\r
32 \r
33 public class ConsumerFactory {\r
34     private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);\r
35 \r
36     // Default values to minimize required configuration\r
37     private static final int DEFAULT_FETCH_PAUSE = 5000;\r
38     private static final int DEFAULT_CONNECT_TIMEOUT = 30000;\r
39     private static final int DEFAULT_READ_TIMEOUT = 180000;\r
40     private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request\r
41     private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;\r
42     private static final String DEFAULT_AUTH_METHOD = "basic";\r
43 \r
44     // Required properties\r
45     protected final String username;\r
46     protected final String password;\r
47     protected final String host;\r
48     private final String group;\r
49     private final String id;\r
50 \r
51     // Optional properties\r
52     protected Integer connectTimeout;\r
53     protected Integer readTimeout;\r
54     private Integer fetchPause;\r
55     private Integer limit;\r
56     private Integer timeoutQueryParamValue;\r
57     private String filter;\r
58     protected String auth;\r
59     \r
60     public ConsumerFactory(Properties properties) {\r
61         // Required properties\r
62         username = properties.getProperty("username");\r
63         password = properties.getProperty("password");\r
64         host = properties.getProperty("host");\r
65         auth = properties.getProperty("auth");\r
66         group = properties.getProperty("group");\r
67         id = properties.getProperty("id");\r
68 \r
69         // Optional properties\r
70         connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");\r
71         readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");\r
72         fetchPause = readOptionalInteger(properties, "fetchPause");\r
73         limit = readOptionalInteger(properties, "limit");\r
74         timeoutQueryParamValue = readOptionalInteger(properties, "timeout");\r
75         processFilter(properties.getProperty("filter"));\r
76 \r
77         setDefaults();\r
78         }\r
79     \r
80     public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {\r
81         this.username = username;\r
82         this.password = password;\r
83         this.host = host;\r
84         this.group = group;\r
85         this.id = id;\r
86         setDefaults();\r
87         }\r
88 \r
89     \r
90     public String getAuth() {\r
91         return auth;\r
92     }\r
93 \r
94     public void setAuth(String auth) {\r
95         this.auth = auth;\r
96     }\r
97 \r
98     public Integer getConnectTimeout() {\r
99         return connectTimeout;\r
100     }\r
101 \r
102     public void setConnectTimeout(Integer connectTimeout) {\r
103         this.connectTimeout = connectTimeout;\r
104     }\r
105 \r
106     public Integer getReadTimeout() {\r
107         return readTimeout;\r
108     }\r
109 \r
110     public void setReadTimeout(Integer readTimeout) {\r
111         this.readTimeout = readTimeout;\r
112     }\r
113 \r
114     public Integer getFetchPause() {\r
115         return fetchPause;\r
116     }\r
117 \r
118     public void setFetchPause(Integer fetchPause) {\r
119         this.fetchPause = fetchPause;\r
120     }\r
121 \r
122     public Integer getLimit() {\r
123         return limit;\r
124     }\r
125 \r
126     public void setLimit(Integer limit) {\r
127         this.limit = limit;\r
128     }\r
129 \r
130     public Integer getTimeoutQueryParamValue() {\r
131         return timeoutQueryParamValue;\r
132     }\r
133 \r
134     public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {\r
135         this.timeoutQueryParamValue = timeoutQueryParamValue;\r
136     }\r
137 \r
138     public String getFilter() {\r
139         return filter;\r
140     }\r
141 \r
142     public void setFilter(String filter) {\r
143         processFilter(filter);\r
144     }\r
145 \r
146     private Integer readOptionalInteger(Properties properties, String propertyName) {\r
147         String stringValue = properties.getProperty(propertyName);\r
148         if (stringValue != null && stringValue.length() > 0) {\r
149             try {\r
150                 return Integer.valueOf(stringValue);\r
151             } catch (NumberFormatException e) {\r
152                 LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);\r
153             }\r
154         }\r
155         return null;\r
156     }\r
157 \r
158     public PollingConsumerImpl createPollingClient() {\r
159         return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);\r
160     }\r
161 \r
162     public PullingConsumerImpl createPullingClient() {\r
163         return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);\r
164     }\r
165 \r
166     private void processFilter(String filterString) {\r
167         if (filterString != null) {\r
168             if (filterString.length() > 0) {\r
169                 try {\r
170                     filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());\r
171                 } catch (UnsupportedEncodingException e) {\r
172                     LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);\r
173                     filter = null;\r
174                 }\r
175             } else {\r
176                 filter = null;\r
177             }\r
178         }\r
179     }\r
180 \r
181     private void setDefaults() {\r
182         if (connectTimeout == null) {\r
183             connectTimeout = DEFAULT_CONNECT_TIMEOUT;\r
184         }\r
185         if (readTimeout == null) {\r
186             readTimeout = DEFAULT_READ_TIMEOUT;\r
187         }\r
188         if (fetchPause == null) {\r
189             fetchPause = DEFAULT_FETCH_PAUSE;\r
190         }\r
191         if (limit == null) {\r
192             limit = DEFAULT_LIMIT;\r
193         }\r
194         if (timeoutQueryParamValue == null) {\r
195             timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;\r
196         }\r
197         if (auth == null) {\r
198             auth = DEFAULT_AUTH_METHOD;\r
199         }\r
200     }\r
201 \r
202 }\r