1 package org.onap.ccsdk.messagerouter.publisher.provider.impl;
\r
3 import java.io.BufferedReader;
\r
4 import java.io.DataOutputStream;
\r
5 import java.io.IOException;
\r
6 import java.io.InputStream;
\r
7 import java.io.InputStreamReader;
\r
8 import java.net.HttpURLConnection;
\r
9 import java.net.SocketException;
\r
10 import java.net.URL;
\r
11 import java.util.Base64;
\r
13 import org.onap.ccsdk.messagerouter.publisher.api.PublisherApi;
\r
14 import org.slf4j.Logger;
\r
15 import org.slf4j.LoggerFactory;
\r
17 public class PublisherApiImpl implements PublisherApi {
\r
18 private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class);
\r
19 protected final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds
\r
20 protected final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes
\r
21 private String authorizationString;
\r
22 protected Integer connectTimeout;
\r
23 protected Integer readTimeout;
\r
24 protected String baseUrl;
\r
25 protected String username;
\r
26 protected String[] hosts;
\r
27 private String password;
\r
29 public void setUsername(String username) {
\r
30 this.username = username;
\r
31 buildAuthorizationString();
\r
34 public void setPassword(String password) {
\r
35 this.password = password;
\r
36 buildAuthorizationString();
\r
39 public void setHost(String hostString) {
\r
40 // a comma separated list of hosts can be passed in or a single host may be used
\r
41 if (!hostString.contains(",")) {
\r
42 this.hosts = new String[] { hostString };
\r
44 this.hosts = hostString.split(",");
\r
48 public PublisherApiImpl() {
\r
49 connectTimeout = DEFAULT_CONNECT_TIMEOUT;
\r
50 readTimeout = DEFAULT_READ_TIMEOUT;
\r
53 public void init() {
\r
54 buildAuthorizationString();
\r
57 private String buildUrlString(Integer hostIndex, String topic) {
\r
58 return hosts[hostIndex] + "/events/" + topic;
\r
61 protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) {
\r
62 httpUrlConnection.setRequestProperty("Content-Type", "application/json");
\r
65 public Boolean publish(String topic, String body) {
\r
66 for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) {
\r
67 HttpURLConnection httpUrlConnection = null;
\r
70 url = new URL(buildUrlString(hostIndex, topic));
\r
71 logger.info("Publishing body to topic {} using the url {}", topic, url);
\r
72 logger.info("Message to publish is\n{}", body);
\r
73 httpUrlConnection = buildHttpURLConnection(url);
\r
74 httpUrlConnection.setDoInput(true);
\r
75 httpUrlConnection.setDoOutput(true);
\r
76 httpUrlConnection.setUseCaches(false);
\r
77 httpUrlConnection.setRequestMethod("POST");
\r
80 httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
\r
81 DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream());
\r
82 outStr.write(body.getBytes());
\r
85 int status = httpUrlConnection.getResponseCode();
\r
86 logger.info("Publishing body for topic {} using url {} returned status {}.", topic, url, status);
\r
88 String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream());
\r
89 logger.info("Message router response is\n{}", responseFromDMaaP);
\r
92 if (httpUrlConnection.getErrorStream() != null) {
\r
93 String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream());
\r
94 logger.warn("Publishing body for topic {} using url {} failed." + " Error message is\n{}",
\r
95 topic, url, responseFromDMaaP);
\r
100 } catch (SocketException socketException) {
\r
101 logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url,
\r
103 if (hostIndex < hosts.length) {
\r
104 logger.info("Message sent to {} failed with a SocketException, but will be tried on {}",
\r
105 hosts[hostIndex], hosts[hostIndex + 1]);
\r
107 } catch (Exception e) {
\r
108 logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e);
\r
111 if (httpUrlConnection != null) {
\r
112 httpUrlConnection.disconnect();
\r
119 private void buildAuthorizationString() {
\r
120 String basicAuthString = username + ":" + password;
\r
121 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
\r
122 this.authorizationString = "Basic " + basicAuthString;
\r
125 protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {
\r
126 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
\r
127 if (authorizationString != null) {
\r
128 httpUrlConnection.setRequestProperty("Authorization", authorizationString);
\r
130 httpUrlConnection.setRequestProperty("Accept", "application/json");
\r
131 httpUrlConnection.setUseCaches(false);
\r
132 httpUrlConnection.setConnectTimeout(connectTimeout);
\r
133 httpUrlConnection.setReadTimeout(readTimeout);
\r
134 configureHttpURLConnection(httpUrlConnection);
\r
135 return httpUrlConnection;
\r
138 protected String readFromStream(InputStream stream) throws IOException {
\r
139 BufferedReader br = new BufferedReader(new InputStreamReader(stream));
\r
140 StringBuilder sb = new StringBuilder();
\r
142 while ((line = br.readLine()) != null) {
\r
146 return sb.toString();
\r