2 * ============LICENSE_START====================================================
\r
4 * ===========================================================================
\r
5 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
\r
7 * Modifications Copyright (C) 2019 IBM.
\r
8 * ===========================================================================
\r
9 * Licensed under the Apache License, Version 2.0 (the "License");
\r
10 * you may not use this file except in compliance with the License.
\r
11 * You may obtain a copy of the License at
\r
13 * http://www.apache.org/licenses/LICENSE-2.0
\r
15 * Unless required by applicable law or agreed to in writing, software
\r
16 * distributed under the License is distributed on an "AS IS" BASIS,
\r
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
18 * See the License for the specific language governing permissions and
\r
19 * limitations under the License.
\r
20 * ============LICENSE_END====================================================
\r
23 package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.provider.impl;
\r
25 import java.io.BufferedReader;
\r
26 import java.io.DataOutputStream;
\r
27 import java.io.IOException;
\r
28 import java.io.InputStream;
\r
29 import java.io.InputStreamReader;
\r
30 import java.net.HttpURLConnection;
\r
31 import java.net.SocketException;
\r
32 import java.net.URL;
\r
33 import java.util.Base64;
\r
35 import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi;
\r
36 import org.slf4j.Logger;
\r
37 import org.slf4j.LoggerFactory;
\r
39 public class PublisherApiImpl implements PublisherApi {
\r
40 private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class);
\r
41 protected static final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds
\r
42 protected static final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes
\r
43 private String authorizationString;
\r
44 protected Integer connectTimeout;
\r
45 protected Integer readTimeout;
\r
46 protected String baseUrl;
\r
47 protected String username;
\r
48 protected String[] hosts;
\r
49 private String password;
\r
51 public void setUsername(String username) {
\r
52 this.username = username;
\r
53 setAuthorizationString();
\r
56 public void setPassword(String password) {
\r
57 this.password = password;
\r
58 setAuthorizationString();
\r
61 public void setHost(String hostString) {
\r
62 // a comma separated list of hosts can be passed in or a single host may be used
\r
63 if (!hostString.contains(",")) {
\r
64 this.hosts = new String[] { hostString };
\r
66 this.hosts = hostString.split(",");
\r
70 public PublisherApiImpl() {
\r
71 connectTimeout = DEFAULT_CONNECT_TIMEOUT;
\r
72 readTimeout = DEFAULT_READ_TIMEOUT;
\r
75 public void init() {
\r
76 setAuthorizationString();
\r
79 protected String buildUrlString(Integer hostIndex, String topic) {
\r
80 return hosts[hostIndex] + "/events/" + topic;
\r
83 protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) {
\r
84 httpUrlConnection.setRequestProperty("Content-Type", "application/json");
\r
87 public Boolean publish(String topic, String body) {
\r
88 for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) {
\r
89 HttpURLConnection httpUrlConnection = null;
\r
92 url = new URL(buildUrlString(hostIndex, topic));
\r
93 logger.info("Publishing body to topic {} using the url {}", topic, url);
\r
94 logger.info("Message to publish is\n{}", body);
\r
95 httpUrlConnection = buildHttpURLConnection(url);
\r
96 httpUrlConnection.setDoInput(true);
\r
97 httpUrlConnection.setDoOutput(true);
\r
98 httpUrlConnection.setUseCaches(false);
\r
99 httpUrlConnection.setRequestMethod("POST");
\r
102 httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));
\r
103 DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream());
\r
104 outStr.write(body.getBytes());
\r
107 int status = httpUrlConnection.getResponseCode();
\r
108 logger.info("Publishing body for topic {} using url {} returned status {}.", topic, url, status);
\r
109 if (status < 300) {
\r
110 String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream());
\r
111 logger.info("Message router response is\n{}", responseFromDMaaP);
\r
114 if (httpUrlConnection.getErrorStream() != null) {
\r
115 String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream());
\r
116 logger.warn("Publishing body for topic {} using url {} failed." + " Error message is\n{}",
\r
117 topic, url, responseFromDMaaP);
\r
122 } catch (SocketException socketException) {
\r
123 logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url,
\r
125 if (hostIndex < hosts.length) {
\r
126 logger.info("Message sent to {} failed with a SocketException, but will be tried on {}",
\r
127 hosts[hostIndex], hosts[hostIndex + 1]);
\r
129 } catch (Exception e) {
\r
130 logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e);
\r
133 if (httpUrlConnection != null) {
\r
134 httpUrlConnection.disconnect();
\r
141 protected void setAuthorizationString() {
\r
142 String str = buildAuthorizationString(this.username, this.password);
\r
143 this.authorizationString = str;
\r
144 //System.out.println(this.authorizationString);
\r
147 protected String buildAuthorizationString(String username, String password) {
\r
148 String basicAuthString = username + ":" + password;
\r
149 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
\r
150 return "Basic " + basicAuthString;
\r
153 protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {
\r
154 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
\r
155 if (authorizationString != null) {
\r
156 System.out.println(authorizationString);
\r
157 httpUrlConnection.setRequestProperty("Authorization", authorizationString);
\r
159 httpUrlConnection.setRequestProperty("Accept", "application/json");
\r
160 httpUrlConnection.setUseCaches(false);
\r
161 httpUrlConnection.setConnectTimeout(connectTimeout);
\r
162 httpUrlConnection.setReadTimeout(readTimeout);
\r
163 configureHttpURLConnection(httpUrlConnection);
\r
164 return httpUrlConnection;
\r
167 protected String readFromStream(InputStream stream) throws IOException {
\r
168 BufferedReader br = new BufferedReader(new InputStreamReader(stream));
\r
169 StringBuilder sb = new StringBuilder();
\r
171 while ((line = br.readLine()) != null) {
\r
175 return sb.toString();
\r