58d0bc9f1875bd51bbdc9e5a218bbd3386e2c72d
[ccsdk/sli/adaptors.git] /
1 package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.provider.impl;\r
2 \r
3 import java.io.BufferedReader;\r
4 import java.io.DataOutputStream;\r
5 import java.io.IOException;\r
6 import java.io.InputStream;\r
7 import java.io.InputStreamReader;\r
8 import java.net.HttpURLConnection;\r
9 import java.net.SocketException;\r
10 import java.net.URL;\r
11 import java.util.Base64;\r
12 \r
13 import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi;\r
14 import org.slf4j.Logger;\r
15 import org.slf4j.LoggerFactory;\r
16 \r
17 public class PublisherApiImpl implements PublisherApi {\r
18         private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class);\r
19         protected final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds\r
20         protected final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes\r
21         private String authorizationString;\r
22         protected Integer connectTimeout;\r
23         protected Integer readTimeout;\r
24         protected String baseUrl;\r
25         protected String username;\r
26         protected String[] hosts;\r
27         private String password;\r
28 \r
29         public void setUsername(String username) {\r
30                 this.username = username;\r
31                 setAuthorizationString();\r
32         }\r
33 \r
34         public void setPassword(String password) {\r
35                 this.password = password;\r
36                 setAuthorizationString();\r
37         }\r
38 \r
39         public void setHost(String hostString) {\r
40                 // a comma separated list of hosts can be passed in or a single host may be used\r
41                 if (!hostString.contains(",")) {\r
42                         this.hosts = new String[] { hostString };\r
43                 } else {\r
44                         this.hosts = hostString.split(",");\r
45                 }\r
46         }\r
47 \r
48         public PublisherApiImpl() {\r
49                 connectTimeout = DEFAULT_CONNECT_TIMEOUT;\r
50                 readTimeout = DEFAULT_READ_TIMEOUT;\r
51         }\r
52 \r
53         public void init() {\r
54                 setAuthorizationString();\r
55         }\r
56 \r
57         protected String buildUrlString(Integer hostIndex, String topic) {\r
58                 return hosts[hostIndex] + "/events/" + topic;\r
59         }\r
60 \r
61         protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) {\r
62                 httpUrlConnection.setRequestProperty("Content-Type", "application/json");\r
63         }\r
64 \r
65         public Boolean publish(String topic, String body) {\r
66                 for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) {\r
67                         HttpURLConnection httpUrlConnection = null;\r
68                         URL url = null;\r
69                         try {\r
70                                 url = new URL(buildUrlString(hostIndex, topic));\r
71                                 logger.info("Publishing body to topic {} using the url {}", topic, url);\r
72                                 logger.info("Message to publish is\n{}", body);\r
73                                 httpUrlConnection = buildHttpURLConnection(url);\r
74                                 httpUrlConnection.setDoInput(true);\r
75                                 httpUrlConnection.setDoOutput(true);\r
76                                 httpUrlConnection.setUseCaches(false);\r
77                                 httpUrlConnection.setRequestMethod("POST");\r
78 \r
79                                 // Write message\r
80                                 httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));\r
81                                 DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream());\r
82                                 outStr.write(body.getBytes());\r
83                                 outStr.close();\r
84 \r
85                                 int status = httpUrlConnection.getResponseCode();\r
86                                 logger.info("Publishing body for topic {} using  url {} returned status {}.", topic, url, status);\r
87                                 if (status < 300) {\r
88                                         String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream());\r
89                                         logger.info("Message router response is\n{}", responseFromDMaaP);\r
90                                         return true;\r
91                                 } else {\r
92                                         if (httpUrlConnection.getErrorStream() != null) {\r
93                                                 String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream());\r
94                                                 logger.warn("Publishing body for topic {} using  url {} failed." + " Error message is\n{}",\r
95                                                                 topic, url, responseFromDMaaP);\r
96                                         }\r
97                                         return false;\r
98                                 }\r
99 \r
100                         } catch (SocketException socketException) {\r
101                                 logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url,\r
102                                                 socketException);\r
103                                 if (hostIndex < hosts.length) {\r
104                                         logger.info("Message sent to {} failed with a SocketException, but will be tried on {}",\r
105                                                         hosts[hostIndex], hosts[hostIndex + 1]);\r
106                                 }\r
107                         } catch (Exception e) {\r
108                                 logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e);\r
109                                 return false;\r
110                         } finally {\r
111                                 if (httpUrlConnection != null) {\r
112                                         httpUrlConnection.disconnect();\r
113                                 }\r
114                         }\r
115                 }\r
116                 return false;\r
117         }\r
118 \r
119         protected void setAuthorizationString() {\r
120             String str = buildAuthorizationString(this.username, this.password);\r
121                 this.authorizationString = str;\r
122                 //System.out.println(this.authorizationString);\r
123         }\r
124         \r
125          protected String buildAuthorizationString(String username, String password) {\r
126              String basicAuthString = username + ":" + password;\r
127              basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());\r
128              return "Basic " + basicAuthString;\r
129         }\r
130 \r
131         protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {\r
132                 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();\r
133                 if (authorizationString != null) {\r
134                     System.out.println(authorizationString);\r
135                         httpUrlConnection.setRequestProperty("Authorization", authorizationString);\r
136                 }\r
137                 httpUrlConnection.setRequestProperty("Accept", "application/json");\r
138                 httpUrlConnection.setUseCaches(false);\r
139                 httpUrlConnection.setConnectTimeout(connectTimeout);\r
140                 httpUrlConnection.setReadTimeout(readTimeout);\r
141                 configureHttpURLConnection(httpUrlConnection);\r
142                 return httpUrlConnection;\r
143         }\r
144 \r
145         protected String readFromStream(InputStream stream) throws IOException {\r
146                 BufferedReader br = new BufferedReader(new InputStreamReader(stream));\r
147                 StringBuilder sb = new StringBuilder();\r
148                 String line;\r
149                 while ((line = br.readLine()) != null) {\r
150                         sb.append(line);\r
151                 }\r
152                 br.close();\r
153                 return sb.toString();\r
154         }\r
155 \r
156 }