1 package org.onap.ccsdk.messagerouter.publisher.provider.impl;
\r 
   3 import java.io.BufferedReader;
\r 
   4 import java.io.DataOutputStream;
\r 
   5 import java.io.IOException;
\r 
   6 import java.io.InputStream;
\r 
   7 import java.io.InputStreamReader;
\r 
   8 import java.net.HttpURLConnection;
\r 
   9 import java.net.SocketException;
\r 
  10 import java.net.URL;
\r 
  11 import java.util.Base64;
\r 
  13 import org.onap.ccsdk.messagerouter.publisher.api.PublisherApi;
\r 
  14 import org.slf4j.Logger;
\r 
  15 import org.slf4j.LoggerFactory;
\r 
  17 public class PublisherApiImpl implements PublisherApi {
\r 
  18         private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class);
\r 
  19         protected final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds
\r 
  20         protected final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes
\r 
  21         private String authorizationString;
\r 
  22         protected Integer connectTimeout;
\r 
  23         protected Integer readTimeout;
\r 
  24         protected String baseUrl;
\r 
  25         protected String username;
\r 
  26         protected String[] hosts;
\r 
  27         private String password;
\r 
  29         public void setUsername(String username) {
\r 
  30                 this.username = username;
\r 
  31                 buildAuthorizationString();
\r 
  34         public void setPassword(String password) {
\r 
  35                 this.password = password;
\r 
  36                 buildAuthorizationString();
\r 
  39         public void setHost(String hostString) {
\r 
  40                 // a comma separated list of hosts can be passed in or a single host may be used
\r 
  41                 if (!hostString.contains(",")) {
\r 
  42                         this.hosts = new String[] { hostString };
\r 
  44                         this.hosts = hostString.split(",");
\r 
  48         public PublisherApiImpl() {
\r 
  49                 connectTimeout = DEFAULT_CONNECT_TIMEOUT;
\r 
  50                 readTimeout = DEFAULT_READ_TIMEOUT;
\r 
  53         public void init() {
\r 
  54                 buildAuthorizationString();
\r 
  57         private String buildUrlString(Integer hostIndex, String topic) {
\r 
  58                 return hosts[hostIndex] + "/events/" + topic;
\r 
  61         protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) {
\r 
  62                 httpUrlConnection.setRequestProperty("Content-Type", "application/json");
\r 
  65         public Boolean publish(String topic, String body) {
\r 
  66                 for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) {
\r 
  67                         HttpURLConnection httpUrlConnection = null;
\r 
  70                                 url = new URL(buildUrlString(hostIndex, topic));
\r 
  71                                 logger.info("Publishing body to topic {} using the url {}", topic, url);
\r 
  72                                 logger.info("Message to publish is\n{}", body);
\r 
  73                                 httpUrlConnection = buildHttpURLConnection(url);
\r 
  74                                 httpUrlConnection.setDoInput(true);
\r 
  75                                 httpUrlConnection.setDoOutput(true);
\r 
  76                                 httpUrlConnection.setUseCaches(false);
\r 
  77                                 httpUrlConnection.setRequestMethod("POST");
\r 
  80                                 httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
\r 
  81                                 DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream());
\r 
  82                                 outStr.write(body.getBytes());
\r 
  85                                 int status = httpUrlConnection.getResponseCode();
\r 
  86                                 logger.info("Publishing body for topic {} using  url {} returned status {}.", topic, url, status);
\r 
  88                                         String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream());
\r 
  89                                         logger.info("Message router response is\n{}", responseFromDMaaP);
\r 
  92                                         if (httpUrlConnection.getErrorStream() != null) {
\r 
  93                                                 String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream());
\r 
  94                                                 logger.warn("Publishing body for topic {} using  url {} failed." + " Error message is\n{}",
\r 
  95                                                                 topic, url, responseFromDMaaP);
\r 
 100                         } catch (SocketException socketException) {
\r 
 101                                 logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url,
\r 
 103                                 if (hostIndex < hosts.length) {
\r 
 104                                         logger.info("Message sent to {} failed with a SocketException, but will be tried on {}",
\r 
 105                                                         hosts[hostIndex], hosts[hostIndex + 1]);
\r 
 107                         } catch (Exception e) {
\r 
 108                                 logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e);
\r 
 111                                 if (httpUrlConnection != null) {
\r 
 112                                         httpUrlConnection.disconnect();
\r 
 119         private void buildAuthorizationString() {
\r 
 120                 String basicAuthString = username + ":" + password;
\r 
 121                 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
\r 
 122                 this.authorizationString = "Basic " + basicAuthString;
\r 
 125         protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {
\r 
 126                 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
\r 
 127                 if (authorizationString != null) {
\r 
 128                         httpUrlConnection.setRequestProperty("Authorization", authorizationString);
\r 
 130                 httpUrlConnection.setRequestProperty("Accept", "application/json");
\r 
 131                 httpUrlConnection.setUseCaches(false);
\r 
 132                 httpUrlConnection.setConnectTimeout(connectTimeout);
\r 
 133                 httpUrlConnection.setReadTimeout(readTimeout);
\r 
 134                 configureHttpURLConnection(httpUrlConnection);
\r 
 135                 return httpUrlConnection;
\r 
 138         protected String readFromStream(InputStream stream) throws IOException {
\r 
 139                 BufferedReader br = new BufferedReader(new InputStreamReader(stream));
\r 
 140                 StringBuilder sb = new StringBuilder();
\r 
 142                 while ((line = br.readLine()) != null) {
\r 
 146                 return sb.toString();
\r