1 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
\r
3 import java.io.UnsupportedEncodingException;
\r
4 import java.net.URLEncoder;
\r
5 import java.nio.charset.StandardCharsets;
\r
6 import java.util.Properties;
\r
8 import org.slf4j.Logger;
\r
9 import org.slf4j.LoggerFactory;
\r
11 public class ConsumerFactory {
\r
12 private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);
\r
14 // Default values to minimize required configuration
\r
15 private static final int DEFAULT_FETCH_PAUSE = 5000;
\r
16 private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
\r
17 private static final int DEFAULT_READ_TIMEOUT = 180000;
\r
18 private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request
\r
19 private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;
\r
20 private static final String DEFAULT_AUTH_METHOD = "basic";
\r
22 // Required properties
\r
23 protected final String username;
\r
24 protected final String password;
\r
25 protected final String host;
\r
26 private final String group;
\r
27 private final String id;
\r
29 // Optional properties
\r
30 protected Integer connectTimeout;
\r
31 protected Integer readTimeout;
\r
32 private Integer fetchPause;
\r
33 private Integer limit;
\r
34 private Integer timeoutQueryParamValue;
\r
35 private String filter;
\r
36 protected String auth;
\r
38 public String getAuth() {
\r
42 public void setAuth(String auth) {
\r
46 public Integer getConnectTimeout() {
\r
47 return connectTimeout;
\r
50 public void setConnectTimeout(Integer connectTimeout) {
\r
51 this.connectTimeout = connectTimeout;
\r
54 public Integer getReadTimeout() {
\r
58 public void setReadTimeout(Integer readTimeout) {
\r
59 this.readTimeout = readTimeout;
\r
62 public Integer getFetchPause() {
\r
66 public void setFetchPause(Integer fetchPause) {
\r
67 this.fetchPause = fetchPause;
\r
70 public Integer getLimit() {
\r
74 public void setLimit(Integer limit) {
\r
78 public Integer getTimeoutQueryParamValue() {
\r
79 return timeoutQueryParamValue;
\r
82 public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {
\r
83 this.timeoutQueryParamValue = timeoutQueryParamValue;
\r
86 public String getFilter() {
\r
90 public void setFilter(String filter) {
\r
91 processFilter(filter);
\r
94 public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {
\r
95 this.username = username;
\r
96 this.password = password;
\r
103 public ConsumerFactory(Properties properties) {
\r
104 // Required properties
\r
105 username = properties.getProperty("username");
\r
106 password = properties.getProperty("password");
\r
107 host = properties.getProperty("host");
\r
108 auth = properties.getProperty("auth");
\r
109 group = properties.getProperty("group");
\r
110 id = properties.getProperty("id");
\r
112 // Optional properties
\r
113 connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");
\r
114 readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");
\r
115 fetchPause = readOptionalInteger(properties, "fetchPause");
\r
116 limit = readOptionalInteger(properties, "limit");
\r
117 timeoutQueryParamValue = readOptionalInteger(properties, "timeout");
\r
118 processFilter(properties.getProperty("filter"));
\r
123 private Integer readOptionalInteger(Properties properties, String propertyName) {
\r
124 String stringValue = properties.getProperty(propertyName);
\r
125 if (stringValue != null && stringValue.length() > 0) {
\r
127 return Integer.valueOf(stringValue);
\r
128 } catch (NumberFormatException e) {
\r
129 LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);
\r
135 public PollingConsumerImpl createPollingClient() {
\r
136 return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);
\r
139 public PullingConsumerImpl createPullingClient() {
\r
140 return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
\r
143 private void processFilter(String filterString) {
\r
144 if (filterString != null) {
\r
145 if (filterString.length() > 0) {
\r
147 filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());
\r
148 } catch (UnsupportedEncodingException e) {
\r
149 LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);
\r
158 private void setDefaults() {
\r
159 if (connectTimeout == null) {
\r
160 connectTimeout = DEFAULT_CONNECT_TIMEOUT;
\r
162 if (readTimeout == null) {
\r
163 readTimeout = DEFAULT_READ_TIMEOUT;
\r
165 if (fetchPause == null) {
\r
166 fetchPause = DEFAULT_FETCH_PAUSE;
\r
168 if (limit == null) {
\r
169 limit = DEFAULT_LIMIT;
\r
171 if (timeoutQueryParamValue == null) {
\r
172 timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;
\r
174 if (auth == null) {
\r
175 auth = DEFAULT_AUTH_METHOD;
\r