d720e5fcdabe4782bf0954e7843af2051dd74ea1
[ccsdk/sli.git] /
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * openECOMP : SDN-C\r
4  * ================================================================================\r
5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights\r
6  *                                              reserved.\r
7  * ================================================================================\r
8  * Licensed under the Apache License, Version 2.0 (the "License");\r
9  * you may not use this file except in compliance with the License.\r
10  * You may obtain a copy of the License at\r
11  * \r
12  *      http://www.apache.org/licenses/LICENSE-2.0\r
13  * \r
14  * Unless required by applicable law or agreed to in writing, software\r
15  * distributed under the License is distributed on an "AS IS" BASIS,\r
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
17  * See the License for the specific language governing permissions and\r
18  * limitations under the License.\r
19  * ============LICENSE_END=========================================================\r
20  */\r
21 \r
22 package org.onap.ccsdk.sli.northbound.dmaapclient;\r
23 \r
24 import java.io.BufferedReader;\r
25 import java.io.File;\r
26 import java.io.FileInputStream;\r
27 import java.io.FileNotFoundException;\r
28 import java.io.IOException;\r
29 import java.io.InputStreamReader;\r
30 import java.io.UnsupportedEncodingException;\r
31 import java.net.MalformedURLException;\r
32 import java.net.URL;\r
33 import java.net.URLEncoder;\r
34 import java.nio.charset.StandardCharsets;\r
35 import java.util.Base64;\r
36 import java.util.Properties;\r
37 import org.slf4j.Logger;\r
38 import org.slf4j.LoggerFactory;\r
39 import java.net.HttpURLConnection;\r
40 \r
41 /*\r
42  * java.net based client to build message router consumers\r
43  */\r
44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {\r
45     private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);\r
46 \r
47     protected Boolean isReady = false;\r
48     protected Boolean isRunning = false;\r
49     protected URL url;\r
50     protected Integer fetchPause;\r
51     protected Properties properties;\r
52     protected final String DEFAULT_CONNECT_TIMEOUT = "30000";\r
53     protected final String DEFAULT_READ_TIMEOUT = "180000";\r
54     protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";\r
55     protected final String DEFAULT_LIMIT = null;\r
56     private String authorizationString;\r
57     protected Integer connectTimeout;\r
58     protected Integer readTimeout;\r
59     protected String topic;\r
60 \r
61     public MessageRouterHttpClientJdk() {}\r
62 \r
63     @Override\r
64     public void run() {\r
65         if (isReady) {\r
66             isRunning = true;\r
67             while (isRunning) {\r
68                 HttpURLConnection httpUrlConnection = null;\r
69                 try {\r
70                     httpUrlConnection = (HttpURLConnection) url.openConnection();\r
71                     if (authorizationString != null) {\r
72                         httpUrlConnection.addRequestProperty("Authorization", authorizationString);\r
73                     }\r
74                     httpUrlConnection.setRequestMethod("GET");\r
75                     httpUrlConnection.setRequestProperty("Accept", "application/json");\r
76                     httpUrlConnection.setUseCaches(false);\r
77                     httpUrlConnection.setConnectTimeout(connectTimeout);\r
78                     httpUrlConnection.setReadTimeout(readTimeout);\r
79                     httpUrlConnection.connect();\r
80                     int status = httpUrlConnection.getResponseCode();\r
81                     Log.info("GET " + url + " returned http status " + status);\r
82                     if (status < 300) {\r
83                         BufferedReader br =\r
84                                 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));\r
85                         StringBuilder sb = new StringBuilder();\r
86                         String line;\r
87                         while ((line = br.readLine()) != null) {\r
88                             sb.append(line + "\n");\r
89                         }\r
90                         br.close();\r
91                         String responseBody = sb.toString();\r
92                         if (responseBody.contains("{")) {\r
93                             // Get rid of opening [" entity =\r
94                             responseBody = responseBody.substring(2);\r
95                             // Get rid of closing "]\r
96                             responseBody = responseBody.substring(0, responseBody.length() - 2);\r
97                             // Split the json array into individual elements to process\r
98                             for (String message : responseBody.split("\",\"")) {\r
99                                 // unescape the json\r
100                                 message = message.replace("\\\"", "\"");\r
101                                 // Topic names cannot contain periods\r
102                                 processMsg(message);\r
103                             }\r
104                         } else {\r
105                             Log.info("Entity doesn't appear to contain JSON elements, logging body");\r
106                             Log.info(responseBody);\r
107                         }\r
108                     }\r
109                 } catch (Exception e) {\r
110                     Log.error("GET " + url + " failed.", e);\r
111                 } finally {\r
112                     if (httpUrlConnection != null) {\r
113                         httpUrlConnection.disconnect();\r
114                     }\r
115                     Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");\r
116                     try {\r
117                         Thread.sleep(fetchPause);\r
118                     } catch (InterruptedException e) {\r
119                         Log.error("Could not sleep thread", e);\r
120                     }\r
121                 }\r
122             }\r
123         }\r
124     }\r
125 \r
126     @Override\r
127     public void init(Properties baseProperties, String consumerPropertiesPath) {\r
128         try {\r
129             baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));\r
130 \r
131             this.properties = baseProperties;\r
132             String username = properties.getProperty("username");\r
133             String password = properties.getProperty("password");\r
134             topic = properties.getProperty("topic");\r
135             String group = properties.getProperty("group");\r
136             String host = properties.getProperty("host");\r
137             String id = properties.getProperty("id");\r
138 \r
139             String filter = properties.getProperty("filter");\r
140             if (filter != null) {\r
141                 if (filter.length() > 0) {\r
142                     try {\r
143                         filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());\r
144                     } catch (UnsupportedEncodingException e) {\r
145                         Log.error("Couldn't encode filter string", e);\r
146                     }\r
147                 } else {\r
148                     filter = null;\r
149                 }\r
150             }\r
151 \r
152             String limitString = properties.getProperty("limit", DEFAULT_LIMIT);\r
153             Integer limit = null;\r
154             if (limitString != null && limitString.length() > 0) {\r
155                 limit = Integer.valueOf(limitString);\r
156             }\r
157 \r
158             Integer timeoutQueryParamValue =\r
159                     Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));\r
160             connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));\r
161             readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));\r
162             if (username != null && password != null && username.length() > 0 && password.length() > 0) {\r
163                 authorizationString = buildAuthorizationString(username, password);\r
164             }\r
165             String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);\r
166             this.url = new URL(urlString);\r
167             this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));\r
168             this.isReady = true;\r
169         } catch (FileNotFoundException e) {\r
170             Log.error("FileNotFoundException while reading consumer properties", e);\r
171         } catch (IOException e) {\r
172             Log.error("IOException while reading consumer properties", e);\r
173         }\r
174     }\r
175 \r
176     public void processMsg(String msg) {\r
177         Log.info(msg);\r
178     }\r
179 \r
180     protected String buildAuthorizationString(String userName, String password) {\r
181         String basicAuthString = userName + ":" + password;\r
182         basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());\r
183         return "Basic " + basicAuthString;\r
184     }\r
185 \r
186     protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,\r
187             Integer timeout, Integer limit, String filter) {\r
188         StringBuilder sb = new StringBuilder();\r
189         sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);\r
190         sb.append("?timeout=" + timeout);\r
191 \r
192         if (limit != null) {\r
193             sb.append("&limit=" + limit);\r
194         }\r
195         if (filter != null) {\r
196             sb.append("&filter=" + filter);\r
197         }\r
198         return sb.toString();\r
199     }\r
200 \r
201     @Override\r
202     public boolean isReady() {\r
203         return isReady;\r
204     }\r
205 \r
206     @Override\r
207     public boolean isRunning() {\r
208         return isRunning;\r
209     }\r
210 \r
211 }\r