2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
\r
7 * ================================================================================
\r
8 * Licensed under the Apache License, Version 2.0 (the "License");
\r
9 * you may not use this file except in compliance with the License.
\r
10 * You may obtain a copy of the License at
\r
12 * http://www.apache.org/licenses/LICENSE-2.0
\r
14 * Unless required by applicable law or agreed to in writing, software
\r
15 * distributed under the License is distributed on an "AS IS" BASIS,
\r
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
17 * See the License for the specific language governing permissions and
\r
18 * limitations under the License.
\r
19 * ============LICENSE_END=========================================================
\r
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
\r
24 import java.io.BufferedReader;
\r
25 import java.io.File;
\r
26 import java.io.FileInputStream;
\r
27 import java.io.FileNotFoundException;
\r
28 import java.io.IOException;
\r
29 import java.io.InputStreamReader;
\r
30 import java.io.UnsupportedEncodingException;
\r
31 import java.net.MalformedURLException;
\r
32 import java.net.URL;
\r
33 import java.net.URLEncoder;
\r
34 import java.nio.charset.StandardCharsets;
\r
35 import java.util.Base64;
\r
36 import java.util.Properties;
\r
37 import org.slf4j.Logger;
\r
38 import org.slf4j.LoggerFactory;
\r
39 import java.net.HttpURLConnection;
\r
42 * java.net based client to build message router consumers
\r
44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
\r
45 private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
\r
47 protected Boolean isReady = false;
\r
48 protected Boolean isRunning = false;
\r
50 protected Integer fetchPause;
\r
51 protected Properties properties;
\r
52 protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
\r
53 protected final String DEFAULT_READ_TIMEOUT = "180000";
\r
54 protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
\r
55 protected final String DEFAULT_LIMIT = null;
\r
56 private String authorizationString;
\r
57 protected Integer connectTimeout;
\r
58 protected Integer readTimeout;
\r
59 protected String topic;
\r
61 public MessageRouterHttpClientJdk() {}
\r
68 HttpURLConnection httpUrlConnection = null;
\r
70 httpUrlConnection = (HttpURLConnection) url.openConnection();
\r
71 if (authorizationString != null) {
\r
72 httpUrlConnection.addRequestProperty("Authorization", authorizationString);
\r
74 httpUrlConnection.setRequestMethod("GET");
\r
75 httpUrlConnection.setRequestProperty("Accept", "application/json");
\r
76 httpUrlConnection.setUseCaches(false);
\r
77 httpUrlConnection.setConnectTimeout(connectTimeout);
\r
78 httpUrlConnection.setReadTimeout(readTimeout);
\r
79 httpUrlConnection.connect();
\r
80 int status = httpUrlConnection.getResponseCode();
\r
81 Log.info("GET " + url + " returned http status " + status);
\r
84 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
\r
85 StringBuilder sb = new StringBuilder();
\r
87 while ((line = br.readLine()) != null) {
\r
88 sb.append(line + "\n");
\r
91 String responseBody = sb.toString();
\r
92 if (responseBody.contains("{")) {
\r
93 // Get rid of opening [" entity =
\r
94 responseBody = responseBody.substring(2);
\r
95 // Get rid of closing "]
\r
96 responseBody = responseBody.substring(0, responseBody.length() - 2);
\r
97 // Split the json array into individual elements to process
\r
98 for (String message : responseBody.split("\",\"")) {
\r
99 // unescape the json
\r
100 message = message.replace("\\\"", "\"");
\r
101 // Topic names cannot contain periods
\r
102 processMsg(message);
\r
105 Log.info("Entity doesn't appear to contain JSON elements, logging body");
\r
106 Log.info(responseBody);
\r
109 } catch (Exception e) {
\r
110 Log.error("GET " + url + " failed.", e);
\r
112 if (httpUrlConnection != null) {
\r
113 httpUrlConnection.disconnect();
\r
115 Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
\r
117 Thread.sleep(fetchPause);
\r
118 } catch (InterruptedException e) {
\r
119 Log.error("Could not sleep thread", e);
\r
127 public void init(Properties baseProperties, String consumerPropertiesPath) {
\r
129 baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
\r
131 this.properties = baseProperties;
\r
132 String username = properties.getProperty("username");
\r
133 String password = properties.getProperty("password");
\r
134 topic = properties.getProperty("topic");
\r
135 String group = properties.getProperty("group");
\r
136 String host = properties.getProperty("host");
\r
137 String id = properties.getProperty("id");
\r
139 String filter = properties.getProperty("filter");
\r
140 if (filter != null) {
\r
141 if (filter.length() > 0) {
\r
143 filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
\r
144 } catch (UnsupportedEncodingException e) {
\r
145 Log.error("Couldn't encode filter string", e);
\r
152 String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
\r
153 Integer limit = null;
\r
154 if (limitString != null && limitString.length() > 0) {
\r
155 limit = Integer.valueOf(limitString);
\r
158 Integer timeoutQueryParamValue =
\r
159 Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
\r
160 connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
\r
161 readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
\r
162 if (username != null && password != null && username.length() > 0 && password.length() > 0) {
\r
163 authorizationString = buildAuthorizationString(username, password);
\r
165 String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
\r
166 this.url = new URL(urlString);
\r
167 this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));
\r
168 this.isReady = true;
\r
169 } catch (FileNotFoundException e) {
\r
170 Log.error("FileNotFoundException while reading consumer properties", e);
\r
171 } catch (IOException e) {
\r
172 Log.error("IOException while reading consumer properties", e);
\r
176 public void processMsg(String msg) {
\r
180 protected String buildAuthorizationString(String userName, String password) {
\r
181 String basicAuthString = userName + ":" + password;
\r
182 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
\r
183 return "Basic " + basicAuthString;
\r
186 protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
\r
187 Integer timeout, Integer limit, String filter) {
\r
188 StringBuilder sb = new StringBuilder();
\r
189 sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
\r
190 sb.append("?timeout=" + timeout);
\r
192 if (limit != null) {
\r
193 sb.append("&limit=" + limit);
\r
195 if (filter != null) {
\r
196 sb.append("&filter=" + filter);
\r
198 return sb.toString();
\r
202 public boolean isReady() {
\r
207 public boolean isRunning() {
\r