Release version 1.1.0 of sli/northbound
[ccsdk/sli/northbound.git] / dmaap-listener / src / main / java / org / onap / ccsdk / sli / northbound / dmaapclient / MessageRouterHttpClientJdk.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
6  *                                              reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
23
24 import java.io.BufferedReader;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.UnsupportedEncodingException;
31 import java.net.HttpURLConnection;
32 import java.net.MalformedURLException;
33 import java.net.URL;
34 import java.net.URLEncoder;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Base64;
37 import java.util.Properties;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /*
42  * java.net based client to build message router consumers
43  */
44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
45     private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
46
47     protected Boolean isReady = false;
48     protected Boolean isRunning = false;
49     protected URL url;
50     protected Integer fetchPause;
51     protected Properties properties;
52     protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
53     protected final String DEFAULT_READ_TIMEOUT = "180000";
54     protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
55     protected final String DEFAULT_LIMIT = null;
56     protected final String DEFAULT_FETCH_PAUSE = "5000";
57
58     private String authorizationString;
59     protected Integer connectTimeout;
60     protected Integer readTimeout;
61     protected String topic;
62
63     public MessageRouterHttpClientJdk() {}
64
65     @Override
66     public void run() {
67         if (isReady) {
68             isRunning = true;
69             while (isRunning) {
70                 HttpURLConnection httpUrlConnection = null;
71                 try {
72                     httpUrlConnection = buildHttpURLConnection();
73                     httpUrlConnection.connect();
74                     int status = httpUrlConnection.getResponseCode();
75                     Log.info("GET " + url + " returned http status " + status);
76                     if (status < 300) {
77                         BufferedReader br =
78                                 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
79                         StringBuilder sb = new StringBuilder();
80                         String line;
81                         while ((line = br.readLine()) != null) {
82                             sb.append(line + "\n");
83                         }
84                         br.close();
85                         String responseBody = sb.toString();
86                         if (responseBody.contains("{")) {
87                             // Get rid of opening [" entity =
88                             responseBody = responseBody.substring(2);
89                             // Get rid of closing "]
90                             responseBody = responseBody.substring(0, responseBody.length() - 2);
91                             // Split the json array into individual elements to process
92                             for (String message : responseBody.split("\",\"")) {
93                                 // unescape the json
94                                 message = message.replace("\\\"", "\"");
95                                 // Topic names cannot contain periods
96                                 processMsg(message);
97                             }
98                         } else {
99                             Log.info("Entity doesn't appear to contain JSON elements, logging body");
100                             Log.info(responseBody);
101                         }
102                     }
103                 } catch (Exception e) {
104                     Log.error("GET " + url + " failed.", e);
105                 } finally {
106                     if (httpUrlConnection != null) {
107                         httpUrlConnection.disconnect();
108                     }
109                     Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
110                     try {
111                         Thread.sleep(fetchPause);
112                     } catch (InterruptedException e) {
113                         Log.error("Could not sleep thread", e);
114                         Thread.currentThread().interrupt();
115                     }
116                 }
117             }
118         }
119     }
120
121     @Override
122     public void init(Properties baseProperties, String consumerPropertiesPath) {
123         try {
124             baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
125             processProperties(baseProperties);
126         } catch (FileNotFoundException e) {
127             Log.error("FileNotFoundException while reading consumer properties", e);
128         } catch (IOException e) {
129             Log.error("IOException while reading consumer properties", e);
130         }
131     }
132
133     protected void processProperties(Properties properties) throws MalformedURLException {
134         this.properties = properties;
135         String username = properties.getProperty("username");
136         String password = properties.getProperty("password");
137         topic = properties.getProperty("topic");
138         String group = properties.getProperty("group");
139         String host = properties.getProperty("host");
140         String id = properties.getProperty("id");
141
142         String filter = properties.getProperty("filter");
143         if (filter != null) {
144             if (filter.length() > 0) {
145                 try {
146                     filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
147                 } catch (UnsupportedEncodingException e) {
148                     Log.error("Couldn't encode filter string", e);
149                 }
150             } else {
151                 filter = null;
152             }
153         }
154
155         String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
156         Integer limit = null;
157         if (limitString != null && limitString.length() > 0) {
158             limit = Integer.valueOf(limitString);
159         }
160
161         Integer timeoutQueryParamValue =
162                 Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
163         connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
164         readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
165         if (username != null && password != null && username.length() > 0 && password.length() > 0) {
166             authorizationString = buildAuthorizationString(username, password);
167         }
168         String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
169         this.url = new URL(urlString);
170         this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause", DEFAULT_FETCH_PAUSE));
171         this.isReady = true;
172     }
173
174     public void processMsg(String msg) {
175         Log.info(msg);
176     }
177
178     protected String buildAuthorizationString(String userName, String password) {
179         String basicAuthString = userName + ":" + password;
180         basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
181         return "Basic " + basicAuthString;
182     }
183
184     protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
185             Integer timeout, Integer limit, String filter) {
186         StringBuilder sb = new StringBuilder();
187         sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
188         sb.append("?timeout=" + timeout);
189
190         if (limit != null) {
191             sb.append("&limit=" + limit);
192         }
193         if (filter != null) {
194             sb.append("&filter=" + filter);
195         }
196         return sb.toString();
197     }
198
199     @Override
200     public boolean isReady() {
201         return isReady;
202     }
203
204     @Override
205     public boolean isRunning() {
206         return isRunning;
207     }
208
209     protected HttpURLConnection buildHttpURLConnection() throws IOException {
210         HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
211         if (authorizationString != null) {
212             httpUrlConnection.setRequestProperty("Authorization", authorizationString);
213         }
214         httpUrlConnection.setRequestMethod("GET");
215         httpUrlConnection.setRequestProperty("Accept", "application/json");
216         httpUrlConnection.setUseCaches(false);
217         httpUrlConnection.setConnectTimeout(connectTimeout);
218         httpUrlConnection.setReadTimeout(readTimeout);
219         return httpUrlConnection;
220     }
221
222 }