additional mr client 41/55541/1
authorSmokowski, Kevin (ks6305) <ks6305@att.com>
Thu, 28 Jun 2018 20:56:05 +0000 (20:56 +0000)
committerSmokowski, Kevin (ks6305) <ks6305@att.com>
Thu, 28 Jun 2018 20:56:05 +0000 (20:56 +0000)
additional mr client, fewer dependencies

Change-Id: I36168fd6e82846a889cd9a01aadf2462bb767723
Issue-ID: CCSDK-327
Signed-off-by: Smokowski, Kevin (ks6305) <ks6305@att.com>
dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java [new file with mode: 0644]

diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClientJdk.java
new file mode 100644 (file)
index 0000000..d720e5f
--- /dev/null
@@ -0,0 +1,211 @@
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * openECOMP : SDN-C\r
+ * ================================================================================\r
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights\r
+ *                                             reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ * \r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.ccsdk.sli.northbound.dmaapclient;\r
+\r
+import java.io.BufferedReader;\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.FileNotFoundException;\r
+import java.io.IOException;\r
+import java.io.InputStreamReader;\r
+import java.io.UnsupportedEncodingException;\r
+import java.net.MalformedURLException;\r
+import java.net.URL;\r
+import java.net.URLEncoder;\r
+import java.nio.charset.StandardCharsets;\r
+import java.util.Base64;\r
+import java.util.Properties;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import java.net.HttpURLConnection;\r
+\r
+/*\r
+ * java.net based client to build message router consumers\r
+ */\r
+public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {\r
+    private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);\r
+\r
+    protected Boolean isReady = false;\r
+    protected Boolean isRunning = false;\r
+    protected URL url;\r
+    protected Integer fetchPause;\r
+    protected Properties properties;\r
+    protected final String DEFAULT_CONNECT_TIMEOUT = "30000";\r
+    protected final String DEFAULT_READ_TIMEOUT = "180000";\r
+    protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";\r
+    protected final String DEFAULT_LIMIT = null;\r
+    private String authorizationString;\r
+    protected Integer connectTimeout;\r
+    protected Integer readTimeout;\r
+    protected String topic;\r
+\r
+    public MessageRouterHttpClientJdk() {}\r
+\r
+    @Override\r
+    public void run() {\r
+        if (isReady) {\r
+            isRunning = true;\r
+            while (isRunning) {\r
+                HttpURLConnection httpUrlConnection = null;\r
+                try {\r
+                    httpUrlConnection = (HttpURLConnection) url.openConnection();\r
+                    if (authorizationString != null) {\r
+                        httpUrlConnection.addRequestProperty("Authorization", authorizationString);\r
+                    }\r
+                    httpUrlConnection.setRequestMethod("GET");\r
+                    httpUrlConnection.setRequestProperty("Accept", "application/json");\r
+                    httpUrlConnection.setUseCaches(false);\r
+                    httpUrlConnection.setConnectTimeout(connectTimeout);\r
+                    httpUrlConnection.setReadTimeout(readTimeout);\r
+                    httpUrlConnection.connect();\r
+                    int status = httpUrlConnection.getResponseCode();\r
+                    Log.info("GET " + url + " returned http status " + status);\r
+                    if (status < 300) {\r
+                        BufferedReader br =\r
+                                new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));\r
+                        StringBuilder sb = new StringBuilder();\r
+                        String line;\r
+                        while ((line = br.readLine()) != null) {\r
+                            sb.append(line + "\n");\r
+                        }\r
+                        br.close();\r
+                        String responseBody = sb.toString();\r
+                        if (responseBody.contains("{")) {\r
+                            // Get rid of opening [" entity =\r
+                            responseBody = responseBody.substring(2);\r
+                            // Get rid of closing "]\r
+                            responseBody = responseBody.substring(0, responseBody.length() - 2);\r
+                            // Split the json array into individual elements to process\r
+                            for (String message : responseBody.split("\",\"")) {\r
+                                // unescape the json\r
+                                message = message.replace("\\\"", "\"");\r
+                                // Topic names cannot contain periods\r
+                                processMsg(message);\r
+                            }\r
+                        } else {\r
+                            Log.info("Entity doesn't appear to contain JSON elements, logging body");\r
+                            Log.info(responseBody);\r
+                        }\r
+                    }\r
+                } catch (Exception e) {\r
+                    Log.error("GET " + url + " failed.", e);\r
+                } finally {\r
+                    if (httpUrlConnection != null) {\r
+                        httpUrlConnection.disconnect();\r
+                    }\r
+                    Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");\r
+                    try {\r
+                        Thread.sleep(fetchPause);\r
+                    } catch (InterruptedException e) {\r
+                        Log.error("Could not sleep thread", e);\r
+                    }\r
+                }\r
+            }\r
+        }\r
+    }\r
+\r
+    @Override\r
+    public void init(Properties baseProperties, String consumerPropertiesPath) {\r
+        try {\r
+            baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));\r
+\r
+            this.properties = baseProperties;\r
+            String username = properties.getProperty("username");\r
+            String password = properties.getProperty("password");\r
+            topic = properties.getProperty("topic");\r
+            String group = properties.getProperty("group");\r
+            String host = properties.getProperty("host");\r
+            String id = properties.getProperty("id");\r
+\r
+            String filter = properties.getProperty("filter");\r
+            if (filter != null) {\r
+                if (filter.length() > 0) {\r
+                    try {\r
+                        filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());\r
+                    } catch (UnsupportedEncodingException e) {\r
+                        Log.error("Couldn't encode filter string", e);\r
+                    }\r
+                } else {\r
+                    filter = null;\r
+                }\r
+            }\r
+\r
+            String limitString = properties.getProperty("limit", DEFAULT_LIMIT);\r
+            Integer limit = null;\r
+            if (limitString != null && limitString.length() > 0) {\r
+                limit = Integer.valueOf(limitString);\r
+            }\r
+\r
+            Integer timeoutQueryParamValue =\r
+                    Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));\r
+            connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));\r
+            readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));\r
+            if (username != null && password != null && username.length() > 0 && password.length() > 0) {\r
+                authorizationString = buildAuthorizationString(username, password);\r
+            }\r
+            String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);\r
+            this.url = new URL(urlString);\r
+            this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));\r
+            this.isReady = true;\r
+        } catch (FileNotFoundException e) {\r
+            Log.error("FileNotFoundException while reading consumer properties", e);\r
+        } catch (IOException e) {\r
+            Log.error("IOException while reading consumer properties", e);\r
+        }\r
+    }\r
+\r
+    public void processMsg(String msg) {\r
+        Log.info(msg);\r
+    }\r
+\r
+    protected String buildAuthorizationString(String userName, String password) {\r
+        String basicAuthString = userName + ":" + password;\r
+        basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());\r
+        return "Basic " + basicAuthString;\r
+    }\r
+\r
+    protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,\r
+            Integer timeout, Integer limit, String filter) {\r
+        StringBuilder sb = new StringBuilder();\r
+        sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);\r
+        sb.append("?timeout=" + timeout);\r
+\r
+        if (limit != null) {\r
+            sb.append("&limit=" + limit);\r
+        }\r
+        if (filter != null) {\r
+            sb.append("&filter=" + filter);\r
+        }\r
+        return sb.toString();\r
+    }\r
+\r
+    @Override\r
+    public boolean isReady() {\r
+        return isReady;\r
+    }\r
+\r
+    @Override\r
+    public boolean isRunning() {\r
+        return isRunning;\r
+    }\r
+\r
+}\r