8937f7b914b767bc6a660433846a24ae28238d5d
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
23
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.net.HttpURLConnection;
29 import java.net.MalformedURLException;
30 import java.net.URL;
31 import java.util.Base64;
32
33 import javax.net.ssl.HostnameVerifier;
34 import javax.net.ssl.HttpsURLConnection;
35 import javax.net.ssl.SSLSession;
36
37 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.ConsumerApi;
38 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.gson.Gson;
43 import com.google.gson.JsonParseException;
44
45 /*
46  * java.net based client to build message router consumers
47  */
48 public abstract class AbstractBaseConsumer implements ConsumerApi {
49     private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseConsumer.class);
50     private static final String REQUEST_METHOD = "GET";
51
52     private final String host;
53     private final Integer connectTimeout;
54     private final Integer readTimeout;
55     private final String group;
56     private final String id;
57     private final String filter;
58     private final Integer limit;
59     private final Integer timeoutQueryParamValue;
60     private final String authorizationString;
61
62     protected RequestHandler requestHandler;
63     protected URL url;
64     protected String topic;
65     
66     public AbstractBaseConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
67         this.host = host;
68         this.connectTimeout = connectTimeout;
69         this.readTimeout = readTimeout;
70         this.group = group;
71         this.id = id;
72         this.filter = filter;
73         this.limit = limit;
74         this.timeoutQueryParamValue = timeoutQueryParamValue;
75
76         if ("basic".equals(authentication)) {
77             if (username != null && password != null && username.length() > 0 && password.length() > 0) {
78                 authorizationString = buildAuthorizationString(username, password);
79             } else {
80                 throw new IllegalStateException("Authentication is set to basic but username or password is missing");
81             }
82         } else if ("noauth".equals(authentication)) {
83             authorizationString = null;
84         } else {
85             throw new IllegalStateException("Unknown authentication method: " + authentication);
86         }
87     }
88
89     protected void poll() {
90         String responseBody = performHttpOperation();
91         if (responseBody != null && !responseBody.startsWith("[]")) {
92             LOG.info("New message was fetched from MessageRouter.");
93             LOG.trace("Fetched message is\n{}", responseBody);
94             try {
95                 String[] requests = new Gson().fromJson(responseBody, String[].class);
96                 if (requests != null) {
97                     for (String request : requests) {
98                         if (request != null) {
99                             requestHandler.handleRequest(topic,request);
100                         }
101                     }
102                 } else {
103                     LOG.warn("Deserialization of received message results in null array.", responseBody);
104                 }
105             } catch (JsonParseException e) {
106                 LOG.warn("Received message has bad format. Expected format is JSON.");
107             }
108         } else {
109             LOG.trace("No new message was fetched from MessageRouter.");
110         }
111     }
112
113     private String buildlUrlString(String topic) {
114         StringBuilder sb = new StringBuilder();
115         sb.append(host + "/events/" + topic + "/" + group + "/" + id);
116         sb.append("?timeout=" + timeoutQueryParamValue);
117
118         if (limit != null) {
119             sb.append("&limit=" + limit);
120         }
121         if (filter != null) {
122             sb.append("&filter=" + filter);
123         }
124         return sb.toString();
125     }
126
127     private String performHttpOperation() {
128         HttpURLConnection httpUrlConnection = null;
129         try {
130             httpUrlConnection = buildHttpURLConnection(url);
131             httpUrlConnection.setRequestMethod(REQUEST_METHOD);
132             httpUrlConnection.connect();
133             int status = httpUrlConnection.getResponseCode();
134             if (status < 300) {
135                 return readFromStream(httpUrlConnection.getInputStream());
136             } else {
137                 String response = readFromStream(httpUrlConnection.getErrorStream());
138                 LOG.warn("Fetching message from MessageRouter on url {} failed with http status {}. Error message is\n{}.", url, status, response);
139             }
140         } catch (Exception e) {
141             LOG.warn("Exception was thrown during fetching message from MessageRouter on url {}.", url, e);
142         } finally {
143             if (httpUrlConnection != null) {
144                 httpUrlConnection.disconnect();
145             }
146         }
147         return null;
148     }
149
150     private String buildAuthorizationString(String userName, String password) {
151         String basicAuthString = userName + ":" + password;
152         basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
153         return "Basic " + basicAuthString;
154     }
155
156     protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {
157         HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
158         if (authorizationString != null) {
159             httpUrlConnection.setRequestProperty("Authorization", authorizationString);
160         }
161         httpUrlConnection.setRequestProperty("Accept", "application/json");
162         httpUrlConnection.setUseCaches(false);
163         httpUrlConnection.setConnectTimeout(connectTimeout);
164         httpUrlConnection.setReadTimeout(readTimeout);
165
166         // ignore hostname errors when dealing with HTTPS connections
167         if (httpUrlConnection instanceof HttpsURLConnection) {
168             HttpsURLConnection conn = (HttpsURLConnection) httpUrlConnection;
169             conn.setHostnameVerifier(new HostnameVerifier() {
170                 @Override
171                 public boolean verify(String arg0, SSLSession arg1) {
172                     return true;
173                 }
174             });
175         }
176         return httpUrlConnection;
177     }
178
179     protected String readFromStream(InputStream stream) throws IOException {
180         BufferedReader br = new BufferedReader(new InputStreamReader(stream));
181         StringBuilder sb = new StringBuilder();
182         String line;
183         while ((line = br.readLine()) != null) {
184             sb.append(line);
185             sb.append("\n");
186         }
187         br.close();
188         return sb.toString();
189     }
190
191     @Override
192     public void registerHandler(String topic, RequestHandler requestHandler) {
193         this.topic = topic;
194         try {
195             this.url = new URL(buildlUrlString(topic));
196         } catch (MalformedURLException e) {
197             LOG.error("Topic " + topic + " resulted in MalformedURLException", e);
198         }
199         this.requestHandler = requestHandler;
200     }
201
202     @Override
203     public void close() throws Exception {
204         //BaseConsumer doesn't spawn any threads
205     }
206
207 }