4efb77c5af59a5145e3bef0fd2cef21bc0252de2
[ccsdk/sli/adaptors.git] /
1 /**\r
2  * ============LICENSE_START====================================================\r
3  * org.onap.aaf\r
4  * ===========================================================================\r
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.\r
6  *\r
7  * Modifications Copyright (C) 2019 IBM.\r
8  * ===========================================================================\r
9  * Licensed under the Apache License, Version 2.0 (the "License");\r
10  * you may not use this file except in compliance with the License.\r
11  * You may obtain a copy of the License at\r
12  * \r
13  *      http://www.apache.org/licenses/LICENSE-2.0\r
14  * \r
15  * Unless required by applicable law or agreed to in writing, software\r
16  * distributed under the License is distributed on an "AS IS" BASIS,\r
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
18  * See the License for the specific language governing permissions and\r
19  * limitations under the License.\r
20  * ============LICENSE_END====================================================\r
21  *\r
22  */\r
23 package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.provider.impl;\r
24 \r
25 import java.io.BufferedReader;\r
26 import java.io.DataOutputStream;\r
27 import java.io.IOException;\r
28 import java.io.InputStream;\r
29 import java.io.InputStreamReader;\r
30 import java.net.HttpURLConnection;\r
31 import java.net.SocketException;\r
32 import java.net.URL;\r
33 import java.util.Base64;\r
34 \r
35 import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi;\r
36 import org.slf4j.Logger;\r
37 import org.slf4j.LoggerFactory;\r
38 \r
39 public class PublisherApiImpl implements PublisherApi {\r
40         private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class);\r
41         protected static final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds\r
42         protected static final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes\r
43         private String authorizationString;\r
44         protected Integer connectTimeout;\r
45         protected Integer readTimeout;\r
46         protected String baseUrl;\r
47         protected String username;\r
48         protected String[] hosts;\r
49         private String password;\r
50 \r
51         public void setUsername(String username) {\r
52                 this.username = username;\r
53                 setAuthorizationString();\r
54         }\r
55 \r
56         public void setPassword(String password) {\r
57                 this.password = password;\r
58                 setAuthorizationString();\r
59         }\r
60 \r
61         public void setHost(String hostString) {\r
62                 // a comma separated list of hosts can be passed in or a single host may be used\r
63                 if (!hostString.contains(",")) {\r
64                         this.hosts = new String[] { hostString };\r
65                 } else {\r
66                         this.hosts = hostString.split(",");\r
67                 }\r
68         }\r
69 \r
70         public PublisherApiImpl() {\r
71                 connectTimeout = DEFAULT_CONNECT_TIMEOUT;\r
72                 readTimeout = DEFAULT_READ_TIMEOUT;\r
73         }\r
74 \r
75         public void init() {\r
76                 setAuthorizationString();\r
77         }\r
78 \r
79         protected String buildUrlString(Integer hostIndex, String topic) {\r
80                 return hosts[hostIndex] + "/events/" + topic;\r
81         }\r
82 \r
83         protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) {\r
84                 httpUrlConnection.setRequestProperty("Content-Type", "application/json");\r
85         }\r
86 \r
87         public Boolean publish(String topic, String body) {\r
88                 for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) {\r
89                         HttpURLConnection httpUrlConnection = null;\r
90                         URL url = null;\r
91                         try {\r
92                                 url = new URL(buildUrlString(hostIndex, topic));\r
93                                 logger.info("Publishing body to topic {} using the url {}", topic, url);\r
94                                 logger.info("Message to publish is\n{}", body);\r
95                                 httpUrlConnection = buildHttpURLConnection(url);\r
96                                 httpUrlConnection.setDoInput(true);\r
97                                 httpUrlConnection.setDoOutput(true);\r
98                                 httpUrlConnection.setUseCaches(false);\r
99                                 httpUrlConnection.setRequestMethod("POST");\r
100 \r
101                                 // Write message\r
102                                 httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length()));\r
103                                 DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream());\r
104                                 outStr.write(body.getBytes());\r
105                                 outStr.close();\r
106 \r
107                                 int status = httpUrlConnection.getResponseCode();\r
108                                 logger.info("Publishing body for topic {} using  url {} returned status {}.", topic, url, status);\r
109                                 if (status < 300) {\r
110                                         String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream());\r
111                                         logger.info("Message router response is\n{}", responseFromDMaaP);\r
112                                         return true;\r
113                                 } else {\r
114                                         if (httpUrlConnection.getErrorStream() != null) {\r
115                                                 String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream());\r
116                                                 logger.warn("Publishing body for topic {} using  url {} failed." + " Error message is\n{}",\r
117                                                                 topic, url, responseFromDMaaP);\r
118                                         }\r
119                                         return false;\r
120                                 }\r
121 \r
122                         } catch (SocketException socketException) {\r
123                                 logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url,\r
124                                                 socketException);\r
125                                 if (hostIndex < hosts.length) {\r
126                                         logger.info("Message sent to {} failed with a SocketException, but will be tried on {}",\r
127                                                         hosts[hostIndex], hosts[hostIndex + 1]);\r
128                                 }\r
129                         } catch (Exception e) {\r
130                                 logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e);\r
131                                 return false;\r
132                         } finally {\r
133                                 if (httpUrlConnection != null) {\r
134                                         httpUrlConnection.disconnect();\r
135                                 }\r
136                         }\r
137                 }\r
138                 return false;\r
139         }\r
140 \r
141         protected void setAuthorizationString() {\r
142             String str = buildAuthorizationString(this.username, this.password);\r
143                 this.authorizationString = str;\r
144                 //System.out.println(this.authorizationString);\r
145         }\r
146         \r
147          protected String buildAuthorizationString(String username, String password) {\r
148              String basicAuthString = username + ":" + password;\r
149              basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());\r
150              return "Basic " + basicAuthString;\r
151         }\r
152 \r
153         protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {\r
154                 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();\r
155                 if (authorizationString != null) {\r
156                     System.out.println(authorizationString);\r
157                         httpUrlConnection.setRequestProperty("Authorization", authorizationString);\r
158                 }\r
159                 httpUrlConnection.setRequestProperty("Accept", "application/json");\r
160                 httpUrlConnection.setUseCaches(false);\r
161                 httpUrlConnection.setConnectTimeout(connectTimeout);\r
162                 httpUrlConnection.setReadTimeout(readTimeout);\r
163                 configureHttpURLConnection(httpUrlConnection);\r
164                 return httpUrlConnection;\r
165         }\r
166 \r
167         protected String readFromStream(InputStream stream) throws IOException {\r
168                 BufferedReader br = new BufferedReader(new InputStreamReader(stream));\r
169                 StringBuilder sb = new StringBuilder();\r
170                 String line;\r
171                 while ((line = br.readLine()) != null) {\r
172                         sb.append(line);\r
173                 }\r
174                 br.close();\r
175                 return sb.toString();\r
176         }\r
177 \r
178 }