--- /dev/null
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * openECOMP : SDN-C\r
+ * ================================================================================\r
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights\r
+ * reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ * \r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.ccsdk.sli.northbound.dmaapclient;\r
+\r
+import java.io.BufferedReader;\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.FileNotFoundException;\r
+import java.io.IOException;\r
+import java.io.InputStreamReader;\r
+import java.io.UnsupportedEncodingException;\r
+import java.net.MalformedURLException;\r
+import java.net.URL;\r
+import java.net.URLEncoder;\r
+import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
+import java.util.Properties;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import java.net.HttpURLConnection;\r
+\r
+/*\r
+ * java.net based client to build message router consumers\r
+ */\r
+public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {\r
+ private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);\r
+\r
+ protected Boolean isReady = false;\r
+ protected Boolean isRunning = false;\r
+ protected URL url;\r
+ protected Integer fetchPause;\r
+ protected Properties properties;\r
+ protected final String DEFAULT_CONNECT_TIMEOUT = "30000";\r
+ protected final String DEFAULT_READ_TIMEOUT = "180000";\r
+ protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";\r
+ protected final String DEFAULT_LIMIT = null;\r
+ private String authorizationString;\r
+ protected Integer connectTimeout;\r
+ protected Integer readTimeout;\r
+ protected String topic;\r
+\r
+ public MessageRouterHttpClientJdk() {}\r
+\r
+ @Override\r
+ public void run() {\r
+ if (isReady) {\r
+ isRunning = true;\r
+ while (isRunning) {\r
+ HttpURLConnection httpUrlConnection = null;\r
+ try {\r
+ httpUrlConnection = (HttpURLConnection) url.openConnection();\r
+ if (authorizationString != null) {\r
+ httpUrlConnection.addRequestProperty("Authorization", authorizationString);\r
+ }\r
+ httpUrlConnection.setRequestMethod("GET");\r
+ httpUrlConnection.setRequestProperty("Accept", "application/json");\r
+ httpUrlConnection.setUseCaches(false);\r
+ httpUrlConnection.setConnectTimeout(connectTimeout);\r
+ httpUrlConnection.setReadTimeout(readTimeout);\r
+ httpUrlConnection.connect();\r
+ int status = httpUrlConnection.getResponseCode();\r
+ Log.info("GET " + url + " returned http status " + status);\r
+ if (status < 300) {\r
+ BufferedReader br =\r
+ new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));\r
+ StringBuilder sb = new StringBuilder();\r
+ String line;\r
+ while ((line = br.readLine()) != null) {\r
+ sb.append(line + "\n");\r
+ }\r
+ br.close();\r
+ String responseBody = sb.toString();\r
+ if (responseBody.contains("{")) {\r
+ // Get rid of opening [" entity =\r
+ responseBody = responseBody.substring(2);\r
+ // Get rid of closing "]\r
+ responseBody = responseBody.substring(0, responseBody.length() - 2);\r
+ // Split the json array into individual elements to process\r
+ for (String message : responseBody.split("\",\"")) {\r
+ // unescape the json\r
+ message = message.replace("\\\"", "\"");\r
+ // Topic names cannot contain periods\r
+ processMsg(message);\r
+ }\r
+ } else {\r
+ Log.info("Entity doesn't appear to contain JSON elements, logging body");\r
+ Log.info(responseBody);\r
+ }\r
+ }\r
+ } catch (Exception e) {\r
+ Log.error("GET " + url + " failed.", e);\r
+ } finally {\r
+ if (httpUrlConnection != null) {\r
+ httpUrlConnection.disconnect();\r
+ }\r
+ Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");\r
+ try {\r
+ Thread.sleep(fetchPause);\r
+ } catch (InterruptedException e) {\r
+ Log.error("Could not sleep thread", e);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public void init(Properties baseProperties, String consumerPropertiesPath) {\r
+ try {\r
+ baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));\r
+\r
+ this.properties = baseProperties;\r
+ String username = properties.getProperty("username");\r
+ String password = properties.getProperty("password");\r
+ topic = properties.getProperty("topic");\r
+ String group = properties.getProperty("group");\r
+ String host = properties.getProperty("host");\r
+ String id = properties.getProperty("id");\r
+\r
+ String filter = properties.getProperty("filter");\r
+ if (filter != null) {\r
+ if (filter.length() > 0) {\r
+ try {\r
+ filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());\r
+ } catch (UnsupportedEncodingException e) {\r
+ Log.error("Couldn't encode filter string", e);\r
+ }\r
+ } else {\r
+ filter = null;\r
+ }\r
+ }\r
+\r
+ String limitString = properties.getProperty("limit", DEFAULT_LIMIT);\r
+ Integer limit = null;\r
+ if (limitString != null && limitString.length() > 0) {\r
+ limit = Integer.valueOf(limitString);\r
+ }\r
+\r
+ Integer timeoutQueryParamValue =\r
+ Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));\r
+ connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));\r
+ readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));\r
+ if (username != null && password != null && username.length() > 0 && password.length() > 0) {\r
+ authorizationString = buildAuthorizationString(username, password);\r
+ }\r
+ String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);\r
+ this.url = new URL(urlString);\r
+ this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));\r
+ this.isReady = true;\r
+ } catch (FileNotFoundException e) {\r
+ Log.error("FileNotFoundException while reading consumer properties", e);\r
+ } catch (IOException e) {\r
+ Log.error("IOException while reading consumer properties", e);\r
+ }\r
+ }\r
+\r
+ public void processMsg(String msg) {\r
+ Log.info(msg);\r
+ }\r
+\r
+ protected String buildAuthorizationString(String userName, String password) {\r
+ String basicAuthString = userName + ":" + password;\r
+ basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());\r
+ return "Basic " + basicAuthString;\r
+ }\r
+\r
+ protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,\r
+ Integer timeout, Integer limit, String filter) {\r
+ StringBuilder sb = new StringBuilder();\r
+ sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);\r
+ sb.append("?timeout=" + timeout);\r
+\r
+ if (limit != null) {\r
+ sb.append("&limit=" + limit);\r
+ }\r
+ if (filter != null) {\r
+ sb.append("&filter=" + filter);\r
+ }\r
+ return sb.toString();\r
+ }\r
+\r
+ @Override\r
+ public boolean isReady() {\r
+ return isReady;\r
+ }\r
+\r
+ @Override\r
+ public boolean isRunning() {\r
+ return isRunning;\r
+ }\r
+\r
+}\r