2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
  24 import java.io.BufferedReader;
 
  26 import java.io.FileInputStream;
 
  27 import java.io.FileNotFoundException;
 
  28 import java.io.IOException;
 
  29 import java.io.InputStreamReader;
 
  30 import java.io.UnsupportedEncodingException;
 
  31 import java.net.HttpURLConnection;
 
  32 import java.net.MalformedURLException;
 
  34 import java.net.URLEncoder;
 
  35 import java.nio.charset.StandardCharsets;
 
  36 import java.util.Base64;
 
  37 import java.util.Properties;
 
  38 import org.slf4j.Logger;
 
  39 import org.slf4j.LoggerFactory;
 
  42  * java.net based client to build message router consumers
 
  44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
 
  45     private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
 
  47     protected Boolean isReady = false;
 
  48     protected Boolean isRunning = false;
 
  50     protected Integer fetchPause;
 
  51     protected Properties properties;
 
  52     protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
 
  53     protected final String DEFAULT_READ_TIMEOUT = "180000";
 
  54     protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
 
  55     protected final String DEFAULT_LIMIT = null;
 
  56     protected final String DEFAULT_FETCH_PAUSE = "5000";
 
  58     private String authorizationString;
 
  59     protected Integer connectTimeout;
 
  60     protected Integer readTimeout;
 
  61     protected String topic;
 
  63     public MessageRouterHttpClientJdk() {}
 
  70                 HttpURLConnection httpUrlConnection = null;
 
  72                     httpUrlConnection = buildHttpURLConnection();
 
  73                     httpUrlConnection.connect();
 
  74                     int status = httpUrlConnection.getResponseCode();
 
  75                     Log.info("GET " + url + " returned http status " + status);
 
  78                                 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
 
  79                         StringBuilder sb = new StringBuilder();
 
  81                         while ((line = br.readLine()) != null) {
 
  82                             sb.append(line + "\n");
 
  85                         String responseBody = sb.toString();
 
  86                         if (responseBody.contains("{")) {
 
  87                             // Get rid of opening [" entity =
 
  88                             responseBody = responseBody.substring(2);
 
  89                             // Get rid of closing "]
 
  90                             responseBody = responseBody.substring(0, responseBody.length() - 2);
 
  91                             // Split the json array into individual elements to process
 
  92                             for (String message : responseBody.split("\",\"")) {
 
  94                                 message = message.replace("\\\"", "\"");
 
  95                                 // Topic names cannot contain periods
 
  99                             Log.info("Entity doesn't appear to contain JSON elements, logging body");
 
 100                             Log.info(responseBody);
 
 103                 } catch (Exception e) {
 
 104                     Log.error("GET " + url + " failed.", e);
 
 106                     if (httpUrlConnection != null) {
 
 107                         httpUrlConnection.disconnect();
 
 109                     Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
 
 111                         Thread.sleep(fetchPause);
 
 112                     } catch (InterruptedException e) {
 
 113                         Log.error("Could not sleep thread", e);
 
 121     public void init(Properties baseProperties, String consumerPropertiesPath) {
 
 123             baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
 
 124             processProperties(baseProperties);
 
 125         } catch (FileNotFoundException e) {
 
 126             Log.error("FileNotFoundException while reading consumer properties", e);
 
 127         } catch (IOException e) {
 
 128             Log.error("IOException while reading consumer properties", e);
 
 132     protected void processProperties(Properties properties) throws MalformedURLException {
 
 133         this.properties = properties;
 
 134         String username = properties.getProperty("username");
 
 135         String password = properties.getProperty("password");
 
 136         topic = properties.getProperty("topic");
 
 137         String group = properties.getProperty("group");
 
 138         String host = properties.getProperty("host");
 
 139         String id = properties.getProperty("id");
 
 141         String filter = properties.getProperty("filter");
 
 142         if (filter != null) {
 
 143             if (filter.length() > 0) {
 
 145                     filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
 
 146                 } catch (UnsupportedEncodingException e) {
 
 147                     Log.error("Couldn't encode filter string", e);
 
 154         String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
 
 155         Integer limit = null;
 
 156         if (limitString != null && limitString.length() > 0) {
 
 157             limit = Integer.valueOf(limitString);
 
 160         Integer timeoutQueryParamValue =
 
 161                 Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
 
 162         connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
 
 163         readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
 
 164         if (username != null && password != null && username.length() > 0 && password.length() > 0) {
 
 165             authorizationString = buildAuthorizationString(username, password);
 
 167         String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
 
 168         this.url = new URL(urlString);
 
 169         this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause", DEFAULT_FETCH_PAUSE));
 
 173     public void processMsg(String msg) {
 
 177     protected String buildAuthorizationString(String userName, String password) {
 
 178         String basicAuthString = userName + ":" + password;
 
 179         basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
 
 180         return "Basic " + basicAuthString;
 
 183     protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
 
 184             Integer timeout, Integer limit, String filter) {
 
 185         StringBuilder sb = new StringBuilder();
 
 186         sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
 
 187         sb.append("?timeout=" + timeout);
 
 190             sb.append("&limit=" + limit);
 
 192         if (filter != null) {
 
 193             sb.append("&filter=" + filter);
 
 195         return sb.toString();
 
 199     public boolean isReady() {
 
 204     public boolean isRunning() {
 
 208     protected HttpURLConnection buildHttpURLConnection() throws IOException {
 
 209         HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
 
 210         if (authorizationString != null) {
 
 211             httpUrlConnection.setRequestProperty("Authorization", authorizationString);
 
 213         httpUrlConnection.setRequestMethod("GET");
 
 214         httpUrlConnection.setRequestProperty("Accept", "application/json");
 
 215         httpUrlConnection.setUseCaches(false);
 
 216         httpUrlConnection.setConnectTimeout(connectTimeout);
 
 217         httpUrlConnection.setReadTimeout(readTimeout);
 
 218         return httpUrlConnection;