2  * ============LICENSE_START=======================================================
\r 
   4  * ================================================================================
\r 
   5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
\r 
   7  * ================================================================================
\r 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
\r 
   9  * you may not use this file except in compliance with the License.
\r 
  10  * You may obtain a copy of the License at
\r 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
\r 
  14  * Unless required by applicable law or agreed to in writing, software
\r 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
\r 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r 
  17  * See the License for the specific language governing permissions and
\r 
  18  * limitations under the License.
\r 
  19  * ============LICENSE_END=========================================================
\r 
  22 package org.onap.ccsdk.sli.northbound.dmaapclient;
\r 
  24 import java.io.BufferedReader;
\r 
  25 import java.io.File;
\r 
  26 import java.io.FileInputStream;
\r 
  27 import java.io.FileNotFoundException;
\r 
  28 import java.io.IOException;
\r 
  29 import java.io.InputStreamReader;
\r 
  30 import java.io.UnsupportedEncodingException;
\r 
  31 import java.net.MalformedURLException;
\r 
  32 import java.net.URL;
\r 
  33 import java.net.URLEncoder;
\r 
  34 import java.nio.charset.StandardCharsets;
\r 
  35 import java.util.Base64;
\r 
  36 import java.util.Properties;
\r 
  37 import org.slf4j.Logger;
\r 
  38 import org.slf4j.LoggerFactory;
\r 
  39 import java.net.HttpURLConnection;
\r 
  42  * java.net based client to build message router consumers
\r 
  44 public class MessageRouterHttpClientJdk implements SdncDmaapConsumer {
\r 
  45     private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClientJdk.class);
\r 
  47     protected Boolean isReady = false;
\r 
  48     protected Boolean isRunning = false;
\r 
  50     protected Integer fetchPause;
\r 
  51     protected Properties properties;
\r 
  52     protected final String DEFAULT_CONNECT_TIMEOUT = "30000";
\r 
  53     protected final String DEFAULT_READ_TIMEOUT = "180000";
\r 
  54     protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
\r 
  55     protected final String DEFAULT_LIMIT = null;
\r 
  56     private String authorizationString;
\r 
  57     protected Integer connectTimeout;
\r 
  58     protected Integer readTimeout;
\r 
  59     protected String topic;
\r 
  61     public MessageRouterHttpClientJdk() {}
\r 
  68                 HttpURLConnection httpUrlConnection = null;
\r 
  70                     httpUrlConnection = (HttpURLConnection) url.openConnection();
\r 
  71                     if (authorizationString != null) {
\r 
  72                         httpUrlConnection.addRequestProperty("Authorization", authorizationString);
\r 
  74                     httpUrlConnection.setRequestMethod("GET");
\r 
  75                     httpUrlConnection.setRequestProperty("Accept", "application/json");
\r 
  76                     httpUrlConnection.setUseCaches(false);
\r 
  77                     httpUrlConnection.setConnectTimeout(connectTimeout);
\r 
  78                     httpUrlConnection.setReadTimeout(readTimeout);
\r 
  79                     httpUrlConnection.connect();
\r 
  80                     int status = httpUrlConnection.getResponseCode();
\r 
  81                     Log.info("GET " + url + " returned http status " + status);
\r 
  84                                 new BufferedReader(new InputStreamReader(httpUrlConnection.getInputStream()));
\r 
  85                         StringBuilder sb = new StringBuilder();
\r 
  87                         while ((line = br.readLine()) != null) {
\r 
  88                             sb.append(line + "\n");
\r 
  91                         String responseBody = sb.toString();
\r 
  92                         if (responseBody.contains("{")) {
\r 
  93                             // Get rid of opening [" entity =
\r 
  94                             responseBody = responseBody.substring(2);
\r 
  95                             // Get rid of closing "]
\r 
  96                             responseBody = responseBody.substring(0, responseBody.length() - 2);
\r 
  97                             // Split the json array into individual elements to process
\r 
  98                             for (String message : responseBody.split("\",\"")) {
\r 
  99                                 // unescape the json
\r 
 100                                 message = message.replace("\\\"", "\"");
\r 
 101                                 // Topic names cannot contain periods
\r 
 102                                 processMsg(message);
\r 
 105                             Log.info("Entity doesn't appear to contain JSON elements, logging body");
\r 
 106                             Log.info(responseBody);
\r 
 109                 } catch (Exception e) {
\r 
 110                     Log.error("GET " + url + " failed.", e);
\r 
 112                     if (httpUrlConnection != null) {
\r 
 113                         httpUrlConnection.disconnect();
\r 
 115                     Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + url + " again.");
\r 
 117                         Thread.sleep(fetchPause);
\r 
 118                     } catch (InterruptedException e) {
\r 
 119                         Log.error("Could not sleep thread", e);
\r 
 127     public void init(Properties baseProperties, String consumerPropertiesPath) {
\r 
 129             baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
\r 
 131             this.properties = baseProperties;
\r 
 132             String username = properties.getProperty("username");
\r 
 133             String password = properties.getProperty("password");
\r 
 134             topic = properties.getProperty("topic");
\r 
 135             String group = properties.getProperty("group");
\r 
 136             String host = properties.getProperty("host");
\r 
 137             String id = properties.getProperty("id");
\r 
 139             String filter = properties.getProperty("filter");
\r 
 140             if (filter != null) {
\r 
 141                 if (filter.length() > 0) {
\r 
 143                         filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
\r 
 144                     } catch (UnsupportedEncodingException e) {
\r 
 145                         Log.error("Couldn't encode filter string", e);
\r 
 152             String limitString = properties.getProperty("limit", DEFAULT_LIMIT);
\r 
 153             Integer limit = null;
\r 
 154             if (limitString != null && limitString.length() > 0) {
\r 
 155                 limit = Integer.valueOf(limitString);
\r 
 158             Integer timeoutQueryParamValue =
\r 
 159                     Integer.valueOf(properties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
\r 
 160             connectTimeout = Integer.valueOf(properties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT));
\r 
 161             readTimeout = Integer.valueOf(properties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT));
\r 
 162             if (username != null && password != null && username.length() > 0 && password.length() > 0) {
\r 
 163                 authorizationString = buildAuthorizationString(username, password);
\r 
 165             String urlString = buildlUrlString(topic, group, id, host, timeoutQueryParamValue, limit, filter);
\r 
 166             this.url = new URL(urlString);
\r 
 167             this.fetchPause = Integer.valueOf(properties.getProperty("fetchPause"));
\r 
 168             this.isReady = true;
\r 
 169         } catch (FileNotFoundException e) {
\r 
 170             Log.error("FileNotFoundException while reading consumer properties", e);
\r 
 171         } catch (IOException e) {
\r 
 172             Log.error("IOException while reading consumer properties", e);
\r 
 176     public void processMsg(String msg) {
\r 
 180     protected String buildAuthorizationString(String userName, String password) {
\r 
 181         String basicAuthString = userName + ":" + password;
\r 
 182         basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
\r 
 183         return "Basic " + basicAuthString;
\r 
 186     protected String buildlUrlString(String topic, String consumerGroup, String consumerId, String host,
\r 
 187             Integer timeout, Integer limit, String filter) {
\r 
 188         StringBuilder sb = new StringBuilder();
\r 
 189         sb.append("http://" + host + "/events/" + topic + "/" + consumerGroup + "/" + consumerId);
\r 
 190         sb.append("?timeout=" + timeout);
\r 
 192         if (limit != null) {
\r 
 193             sb.append("&limit=" + limit);
\r 
 195         if (filter != null) {
\r 
 196             sb.append("&filter=" + filter);
\r 
 198         return sb.toString();
\r 
 202     public boolean isReady() {
\r 
 207     public boolean isRunning() {
\r