1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
13 ******************************************************************************/
14 package org.onap.msb.apiroute.wrapper.consulextend.util;
16 import java.io.IOException;
17 import java.lang.reflect.ParameterizedType;
18 import java.lang.reflect.Type;
19 import java.math.BigInteger;
20 import java.util.List;
23 import org.apache.commons.io.IOUtils;
24 import org.apache.http.HttpEntity;
25 import org.apache.http.HttpResponse;
26 import org.apache.http.client.methods.HttpGet;
27 import org.apache.http.concurrent.FutureCallback;
28 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
29 import org.apache.http.impl.nio.client.HttpAsyncClients;
30 import org.eclipse.jetty.http.HttpStatus;
31 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseCallback;
32 import org.onap.msb.apiroute.wrapper.consulextend.async.ConsulResponseHeader;
33 import org.onap.msb.apiroute.wrapper.consulextend.async.OriginalConsulResponse;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.fasterxml.jackson.core.type.TypeReference;
38 import com.google.common.collect.ImmutableList;
39 import com.google.common.collect.ImmutableMap;
40 import com.google.common.collect.Sets;
41 import com.orbitz.consul.ConsulException;
42 import com.orbitz.consul.model.ConsulResponse;
43 import com.orbitz.consul.option.CatalogOptions;
44 import com.orbitz.consul.option.QueryOptions;
45 import com.orbitz.consul.util.Jackson;
48 private static final Logger LOGGER = LoggerFactory.getLogger(Http.class);
50 private final static CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom()
51 .setMaxConnTotal(Integer.MAX_VALUE).setMaxConnPerRoute(Integer.MAX_VALUE).build();
53 private static Http instance = null;
57 public static Http getInstance() {
58 if (instance == null) {
59 instance = new Http();
60 httpAsyncClient.start();
66 // async get data from consul,and handle response immediately
67 public <T> void asyncGet(String requestURI, final TypeReference<T> responseType,
68 final ConsulResponseCallback<T> callback, final Integer... okCodes) {
69 // LOGGER.info("Async request:"+requestURI);
71 httpAsyncClient.execute(new HttpGet(requestURI), new FutureCallback<HttpResponse>() {
73 public void completed(final HttpResponse response) {
74 callback.onComplete(consulResponse(responseType, response));
77 public void failed(final Exception ex) {
78 callback.onFailure(ex);
81 public void cancelled() {
82 LOGGER.warn("cancelled async request");
87 // async get data from consul,and handle response delay
88 public <T> void asyncGetDelayHandle(String requestURI, final TypeReference<T> responseType,
89 final ConsulResponseCallback<T> callback, final Integer... okCodes) {
91 httpAsyncClient.execute(new HttpGet(requestURI), new FutureCallback<HttpResponse>() {
93 public void completed(final HttpResponse response) {
94 OriginalConsulResponse<T> originalConsulResponse =
95 new OriginalConsulResponse<T>(response, responseType);
97 // handle not 2xx code
98 if (!isSuccessful(response)) {
100 LOGGER.warn("response statuscode:" + response.getStatusLine().getStatusCode());
102 callback.onFailure(new ConsulException(
103 "response statuscode:" + response.getStatusLine().getStatusCode()));
105 callback.onDelayComplete(originalConsulResponse);
110 public void failed(final Exception ex) {
111 callback.onFailure(ex);
114 public void cancelled() {
115 LOGGER.warn("cancelled async request");
120 public static ConsulResponseHeader consulResponseHeader(HttpResponse response) {
121 String indexHeaderValue = response.getFirstHeader("X-Consul-Index").getValue();
122 String lastContactHeaderValue = response.getFirstHeader("X-Consul-Lastcontact").getValue();
123 String knownLeaderHeaderValue = response.getFirstHeader("X-Consul-Knownleader").getValue();
125 BigInteger index = indexHeaderValue == null ? new BigInteger("0") : new BigInteger(indexHeaderValue);
126 long lastContact = lastContactHeaderValue == null ? 0 : Long.parseLong(lastContactHeaderValue);
127 boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean.parseBoolean(knownLeaderHeaderValue);
129 return new ConsulResponseHeader(lastContact, knownLeader, index);
132 public static <T> ConsulResponse<T> consulResponse(TypeReference<T> responseType, HttpResponse response) {
134 String indexHeaderValue = response.getFirstHeader("X-Consul-Index").getValue();
135 String lastContactHeaderValue = response.getFirstHeader("X-Consul-Lastcontact").getValue();
136 String knownLeaderHeaderValue = response.getFirstHeader("X-Consul-Knownleader").getValue();
138 BigInteger index = indexHeaderValue == null ? new BigInteger("0") : new BigInteger(indexHeaderValue);
139 long lastContact = lastContactHeaderValue == null ? 0 : Long.parseLong(lastContactHeaderValue);
140 boolean knownLeader = knownLeaderHeaderValue == null ? false : Boolean.parseBoolean(knownLeaderHeaderValue);
142 ConsulResponse<T> consulResponse =
143 new ConsulResponse<T>(readResponse(response, responseType), lastContact, knownLeader, index);
144 return consulResponse;
147 @SuppressWarnings({"unchecked", "rawtypes"})
148 public static <T> T readResponse(HttpResponse response, TypeReference<T> responseType) {
150 // read streamed entity
153 // HttpEntity,read original data.
154 Type _type = responseType.getType();
155 if (_type instanceof Class && (((Class) _type).isAssignableFrom(HttpEntity.class))) {
156 object = (T) response.getEntity();
160 // String,read original data.
161 if (_type instanceof Class && (((Class) _type).isAssignableFrom(String.class))) {
165 object = (T) IOUtils.toString(response.getEntity().getContent());
166 response.getEntity().getContent().close();
168 } catch (UnsupportedOperationException e) {
170 LOGGER.warn("covert streamed entity to String exception:", e);
171 } catch (IOException e) {
173 LOGGER.warn("covert streamed entity to String exception:", e);
181 object = Jackson.MAPPER.readValue(response.getEntity().getContent(), responseType);
182 } catch (IOException e) {
183 LOGGER.warn("covert streamed entity to object exception:", e);
184 object = readDefaultResponse(responseType);
190 @SuppressWarnings("unchecked")
191 public static <T> T readDefaultResponse(TypeReference<T> responseType) {
192 Type _type = responseType.getType();
193 if (_type instanceof ParameterizedType && ((ParameterizedType) _type).getRawType() == List.class) {
194 return (T) ImmutableList.of();
195 } else if (_type instanceof ParameterizedType && ((ParameterizedType) _type).getRawType() == Map.class) {
196 return (T) ImmutableMap.of();
198 // Not sure if this case will be reached, but if it is it'll be nice
200 throw new IllegalStateException("Cannot determine empty representation for " + _type);
204 public static boolean isSuccessful(HttpResponse response, Integer... okCodes) {
205 return HttpStatus.isSuccess(response.getStatusLine().getStatusCode())
206 || Sets.newHashSet(okCodes).contains(response.getStatusLine().getStatusCode());
209 public static String optionsFrom(CatalogOptions catalogOptions, QueryOptions queryOptions) {
212 if (catalogOptions != null) {
213 Map<String, Object> options = catalogOptions.toQuery();
215 if (options.containsKey("dc")) {
216 params += "dc=" + options.get("dc");
218 if (options.containsKey("tag")) {
219 params += params.isEmpty() ? "" : "&";
220 params += "tag=" + options.get("tag");
224 if (queryOptions != null) {
225 Map<String, Object> options = queryOptions.toQuery();
227 if (options.containsKey("consistent")) {
228 params += params.isEmpty() ? "" : "&";
229 params += "consistent=" + options.get("consistent");
231 if (options.containsKey("stale")) {
232 params += params.isEmpty() ? "" : "&";
233 params += "stale=" + options.get("stale");
235 if (options.containsKey("wait")) {
236 params += params.isEmpty() ? "" : "&";
237 params += "wait=" + options.get("wait");
240 if (options.containsKey("index")) {
241 params += params.isEmpty() ? "" : "&";
242 params += "index=" + options.get("index");
244 if (options.containsKey("token")) {
245 params += params.isEmpty() ? "" : "&";
246 params += "token=" + options.get("token");
248 if (options.containsKey("near")) {
249 params += params.isEmpty() ? "" : "&";
250 params += "near=" + options.get("near");
252 if (options.containsKey("dc")) {
253 params += params.isEmpty() ? "" : "&";
254 params += "dc=" + options.get("dc");