2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.net.HttpURLConnection;
29 import java.net.MalformedURLException;
31 import java.util.Base64;
33 import javax.net.ssl.HostnameVerifier;
34 import javax.net.ssl.HttpsURLConnection;
35 import javax.net.ssl.SSLSession;
37 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.ConsumerApi;
38 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.gson.Gson;
43 import com.google.gson.JsonParseException;
46 * java.net based client to build message router consumers
48 public abstract class AbstractBaseConsumer implements ConsumerApi {
49 private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseConsumer.class);
50 private static final String REQUEST_METHOD = "GET";
52 private final String host;
53 private final Integer connectTimeout;
54 private final Integer readTimeout;
55 private final String group;
56 private final String id;
57 private final String filter;
58 private final Integer limit;
59 private final Integer timeoutQueryParamValue;
60 private final String authorizationString;
62 protected RequestHandler requestHandler;
64 protected String topic;
66 public AbstractBaseConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
68 this.connectTimeout = connectTimeout;
69 this.readTimeout = readTimeout;
74 this.timeoutQueryParamValue = timeoutQueryParamValue;
76 if ("basic".equals(authentication)) {
77 if (username != null && password != null && username.length() > 0 && password.length() > 0) {
78 authorizationString = buildAuthorizationString(username, password);
80 throw new IllegalStateException("Authentication is set to basic but username or password is missing");
82 } else if ("noauth".equals(authentication)) {
83 authorizationString = null;
85 throw new IllegalStateException("Unknown authentication method: " + authentication);
89 protected void poll() {
90 String responseBody = performHttpOperation();
91 if (responseBody != null && !responseBody.startsWith("[]")) {
92 LOG.info("New message was fetched from MessageRouter.");
93 LOG.trace("Fetched message is\n{}", responseBody);
95 String[] requests = new Gson().fromJson(responseBody, String[].class);
96 if (requests != null) {
97 for (String request : requests) {
98 if (request != null) {
99 requestHandler.handleRequest(topic,request);
103 LOG.warn("Deserialization of received message results in null array.", responseBody);
105 } catch (JsonParseException e) {
106 LOG.warn("Received message has bad format. Expected format is JSON.");
109 LOG.trace("No new message was fetched from MessageRouter.");
113 private String buildlUrlString(String topic) {
114 StringBuilder sb = new StringBuilder();
115 sb.append(host + "/events/" + topic + "/" + group + "/" + id);
116 sb.append("?timeout=" + timeoutQueryParamValue);
119 sb.append("&limit=" + limit);
121 if (filter != null) {
122 sb.append("&filter=" + filter);
124 return sb.toString();
127 private String performHttpOperation() {
128 HttpURLConnection httpUrlConnection = null;
130 httpUrlConnection = buildHttpURLConnection(url);
131 httpUrlConnection.setRequestMethod(REQUEST_METHOD);
132 httpUrlConnection.connect();
133 int status = httpUrlConnection.getResponseCode();
135 return readFromStream(httpUrlConnection.getInputStream());
137 String response = readFromStream(httpUrlConnection.getErrorStream());
138 LOG.warn("Fetching message from MessageRouter on url {} failed with http status {}. Error message is\n{}.", url, status, response);
140 } catch (Exception e) {
141 LOG.warn("Exception was thrown during fetching message from MessageRouter on url {}.", url, e);
143 if (httpUrlConnection != null) {
144 httpUrlConnection.disconnect();
150 private String buildAuthorizationString(String userName, String password) {
151 String basicAuthString = userName + ":" + password;
152 basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
153 return "Basic " + basicAuthString;
156 protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException {
157 HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection();
158 if (authorizationString != null) {
159 httpUrlConnection.setRequestProperty("Authorization", authorizationString);
161 httpUrlConnection.setRequestProperty("Accept", "application/json");
162 httpUrlConnection.setUseCaches(false);
163 httpUrlConnection.setConnectTimeout(connectTimeout);
164 httpUrlConnection.setReadTimeout(readTimeout);
166 // ignore hostname errors when dealing with HTTPS connections
167 if (httpUrlConnection instanceof HttpsURLConnection) {
168 HttpsURLConnection conn = (HttpsURLConnection) httpUrlConnection;
169 conn.setHostnameVerifier(new HostnameVerifier() {
171 public boolean verify(String arg0, SSLSession arg1) {
176 return httpUrlConnection;
179 protected String readFromStream(InputStream stream) throws IOException {
180 BufferedReader br = new BufferedReader(new InputStreamReader(stream));
181 StringBuilder sb = new StringBuilder();
183 while ((line = br.readLine()) != null) {
188 return sb.toString();
192 public void registerHandler(String topic, RequestHandler requestHandler) {
195 this.url = new URL(buildlUrlString(topic));
196 } catch (MalformedURLException e) {
197 LOG.error("Topic " + topic + " resulted in MalformedURLException", e);
199 this.requestHandler = requestHandler;
203 public void close() throws Exception {
204 //BaseConsumer doesn't spawn any threads