2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
24 import java.io.BufferedReader;
26 import java.io.FileInputStream;
27 import java.io.FileNotFoundException;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.UnsupportedEncodingException;
31 import java.net.HttpURLConnection;
32 import java.net.MalformedURLException;
34 import java.net.URLEncoder;
35 import java.nio.charset.StandardCharsets;
36 import java.util.Base64;
37 import java.util.Properties;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * java.net based client to build message router consumers
44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
45 private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
47 protected Boolean isReady = false;
48 protected Boolean isRunning = false;
50 protected Integer fetchPause;
51 protected Properties properties;
52 protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
53 protected final String DEFAULT_READ_TIMEOUT = "180000";
54 protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
55 protected final String DEFAULT_LIMIT = null;
56 protected final String DEFAULT_FETCH_PAUSE = "5000";
58 private String authorizationString;
59 protected Integer connectTimeout;
60 protected Integer readTimeout;
61 protected String topic;
63 public MessageRouterHttpClientJdk() {}
70 HttpURLConnection httpUrlConnection = null;
72 httpUrlConnection = buildHttpURLConnection();
73 httpUrlConnection.connect();
74 int status = httpUrlConnection.getResponseCode();
75 Log.info("GET " + url + " returned http status " + status);
78 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
79 StringBuilder sb = new StringBuilder();
81 while ((line = br.readLine()) != null) {
82 sb.append(line + "\n");
85 String responseBody = sb.toString();
86 if (responseBody.contains("{")) {
87 // Get rid of opening [" entity =
88 responseBody = responseBody.substring(2);
89 // Get rid of closing "]
90 responseBody = responseBody.substring(0, responseBody.length() - 2);
91 // Split the json array into individual elements to process
92 for (String message : responseBody.split("\",\"")) {
94 message = message.replace("\\\"", "\"");
95 // Topic names cannot contain periods
99 Log.info("Entity doesn't appear to contain JSON elements, logging body");
100 Log.info(responseBody);
103 } catch (Exception e) {
104 Log.error("GET " + url + " failed.", e);
106 if (httpUrlConnection != null) {
107 httpUrlConnection.disconnect();
109 Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
111 Thread.sleep(fetchPause);
112 } catch (InterruptedException e) {
113 Log.error("Could not sleep thread", e);
114 Thread.currentThread().interrupt();
122 public void init(Properties baseProperties, String consumerPropertiesPath) {
124 baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
125 processProperties(baseProperties);
126 } catch (FileNotFoundException e) {
127 Log.error("FileNotFoundException while reading consumer properties", e);
128 } catch (IOException e) {
129 Log.error("IOException while reading consumer properties", e);
133 protected void processProperties(Properties properties) throws MalformedURLException {
134 this.properties = properties;
135 String username = properties.getProperty("username");
136 String password = properties.getProperty("password");
137 topic = properties.getProperty("topic");
138 String group = properties.getProperty("group");
139 String host = properties.getProperty("host");
140 String id = properties.getProperty("id");
142 String filter = properties.getProperty("filter");
143 if (filter != null) {
144 if (filter.length() > 0) {
146 filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
147 } catch (UnsupportedEncodingException e) {
148 Log.error("Couldn't encode filter string", e);
155 String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
156 Integer limit = null;
157 if (limitString != null && limitString.length() > 0) {
158 limit = Integer.valueOf(limitString);
161 Integer timeoutQueryParamValue =
162 Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
163 connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
164 readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
165 if (username != null && password != null && username.length() > 0 && password.length() > 0) {
166 authorizationString = buildAuthorizationString(username, password);
168 String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
169 this.url = new URL(urlString);
170 this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause", DEFAULT_FETCH_PAUSE));
174 public void processMsg(String msg) {
178 protected String buildAuthorizationString(String userName, String password) {
179 String basicAuthString = userName + ":" + password;
180 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
181 return "Basic " + basicAuthString;
184 protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
185 Integer timeout, Integer limit, String filter) {
186 StringBuilder sb = new StringBuilder();
187 sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
188 sb.append("?timeout=" + timeout);
191 sb.append("&limit=" + limit);
193 if (filter != null) {
194 sb.append("&filter=" + filter);
196 return sb.toString();
200 public boolean isReady() {
205 public boolean isRunning() {
209 protected HttpURLConnection buildHttpURLConnection() throws IOException {
210 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
211 if (authorizationString != null) {
212 httpUrlConnection.setRequestProperty("Authorization", authorizationString);
214 httpUrlConnection.setRequestMethod("GET");
215 httpUrlConnection.setRequestProperty("Accept", "application/json");
216 httpUrlConnection.setUseCaches(false);
217 httpUrlConnection.setConnectTimeout(connectTimeout);
218 httpUrlConnection.setReadTimeout(readTimeout);
219 return httpUrlConnection;