2  * ========================LICENSE_START=================================
 
   4  * ======================================================================
 
   5  * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
 
   6  * ======================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ========================LICENSE_END===================================
 
  21 package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
  23 import io.netty.channel.ChannelOption;
 
  24 import io.netty.handler.ssl.SslContext;
 
  25 import io.netty.handler.timeout.ReadTimeoutHandler;
 
  26 import io.netty.handler.timeout.WriteTimeoutHandler;
 
  28 import java.lang.invoke.MethodHandles;
 
  29 import java.util.concurrent.atomic.AtomicInteger;
 
  31 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig.HttpProxyConfig;
 
  32 import org.slf4j.Logger;
 
  33 import org.slf4j.LoggerFactory;
 
  34 import org.springframework.http.MediaType;
 
  35 import org.springframework.http.ResponseEntity;
 
  36 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 
  37 import org.springframework.lang.Nullable;
 
  38 import org.springframework.web.reactive.function.client.ExchangeStrategies;
 
  39 import org.springframework.web.reactive.function.client.WebClient;
 
  40 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
 
  41 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
  43 import reactor.core.publisher.Mono;
 
  44 import reactor.netty.http.client.HttpClient;
 
  45 import reactor.netty.resources.ConnectionProvider;
 
  46 import reactor.netty.tcp.ProxyProvider.Proxy;
 
  47 import reactor.netty.tcp.TcpClient;
 
  50  * Generic reactive REST client.
 
  52 public class AsyncRestClient {
 
  54     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
  55     private WebClient webClient = null;
 
  56     private final String baseUrl;
 
  57     private static final AtomicInteger sequenceNumber = new AtomicInteger();
 
  58     private final SslContext sslContext;
 
  59     private final HttpProxyConfig httpProxyConfig;
 
  61     public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) {
 
  62         this.baseUrl = baseUrl;
 
  63         this.sslContext = sslContext;
 
  64         this.httpProxyConfig = httpProxyConfig;
 
  67     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
 
  68         Object traceTag = createTraceTag();
 
  69         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
 
  70         logger.trace("{} POST body: {}", traceTag, body);
 
  71         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
 
  72         return getWebClient() //
 
  74                     RequestHeadersSpec<?> request = client.post() //
 
  76                             .contentType(MediaType.APPLICATION_JSON) //
 
  77                             .body(bodyProducer, String.class);
 
  78                     return retrieve(traceTag, request);
 
  82     public Mono<String> post(String uri, @Nullable String body) {
 
  83         return postForEntity(uri, body) //
 
  84                 .flatMap(this::toBody);
 
  87     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
 
  88         Object traceTag = createTraceTag();
 
  89         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
 
  90         logger.trace("{} POST body: {}", traceTag, body);
 
  91         return getWebClient() //
 
  93                     RequestHeadersSpec<?> request = client.post() //
 
  95                             .headers(headers -> headers.setBasicAuth(username, password)) //
 
  96                             .contentType(MediaType.APPLICATION_JSON) //
 
  98                     return retrieve(traceTag, request) //
 
  99                             .flatMap(this::toBody);
 
 103     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
 
 104         Object traceTag = createTraceTag();
 
 105         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
 
 106         logger.trace("{} PUT body: {}", traceTag, body);
 
 107         return getWebClient() //
 
 109                     RequestHeadersSpec<?> request = client.put() //
 
 111                             .contentType(MediaType.APPLICATION_JSON) //
 
 113                     return retrieve(traceTag, request);
 
 117     public Mono<ResponseEntity<String>> putForEntity(String uri) {
 
 118         Object traceTag = createTraceTag();
 
 119         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
 
 120         logger.trace("{} PUT body: <empty>", traceTag);
 
 121         return getWebClient() //
 
 123                     RequestHeadersSpec<?> request = client.put() //
 
 125                     return retrieve(traceTag, request);
 
 129     public Mono<String> put(String uri, String body) {
 
 130         return putForEntity(uri, body) //
 
 131                 .flatMap(this::toBody);
 
 134     public Mono<ResponseEntity<String>> getForEntity(String uri) {
 
 135         Object traceTag = createTraceTag();
 
 136         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
 
 137         return getWebClient() //
 
 139                     RequestHeadersSpec<?> request = client.get().uri(uri);
 
 140                     return retrieve(traceTag, request);
 
 144     public Mono<String> get(String uri) {
 
 145         return getForEntity(uri) //
 
 146                 .flatMap(this::toBody);
 
 149     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
 
 150         Object traceTag = createTraceTag();
 
 151         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
 
 152         return getWebClient() //
 
 154                     RequestHeadersSpec<?> request = client.delete().uri(uri);
 
 155                     return retrieve(traceTag, request);
 
 159     public Mono<String> delete(String uri) {
 
 160         return deleteForEntity(uri) //
 
 161                 .flatMap(this::toBody);
 
 164     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
 
 165         final Class<String> clazz = String.class;
 
 166         return request.retrieve() //
 
 168                 .doOnNext(entity -> logReceivedData(traceTag, entity)) //
 
 169                 .doOnError(throwable -> onHttpError(traceTag, throwable));
 
 172     private void logReceivedData(Object traceTag, ResponseEntity<String> entity) {
 
 173         logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType());
 
 176     private static Object createTraceTag() {
 
 177         return sequenceNumber.incrementAndGet();
 
 180     private void onHttpError(Object traceTag, Throwable t) {
 
 181         if (t instanceof WebClientResponseException) {
 
 182             WebClientResponseException exception = (WebClientResponseException) t;
 
 183             logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
 
 184                     exception.getResponseBodyAsString());
 
 186             logger.debug("{} HTTP error {}", traceTag, t.getMessage());
 
 190     private Mono<String> toBody(ResponseEntity<String> entity) {
 
 191         if (entity.getBody() == null) {
 
 192             return Mono.just("");
 
 194             return Mono.just(entity.getBody());
 
 198     private boolean isHttpProxyConfigured() {
 
 199         return httpProxyConfig != null && httpProxyConfig.httpProxyPort() > 0
 
 200                 && !httpProxyConfig.httpProxyHost().isEmpty();
 
 203     private TcpClient createTcpClient() {
 
 204         TcpClient client = TcpClient.create(ConnectionProvider.newConnection()) //
 
 205                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
 
 206                 .doOnConnected(connection -> {
 
 207                     connection.addHandlerLast(new ReadTimeoutHandler(30));
 
 208                     connection.addHandlerLast(new WriteTimeoutHandler(30));
 
 210         if (this.sslContext != null) {
 
 211             client = client.secure(c -> c.sslContext(sslContext));
 
 213         if (isHttpProxyConfigured()) {
 
 214             client = client.proxy(proxy -> proxy.type(Proxy.HTTP).host(httpProxyConfig.httpProxyHost())
 
 215                     .port(httpProxyConfig.httpProxyPort()));
 
 220     private WebClient createWebClient(String baseUrl, TcpClient tcpClient) {
 
 221         HttpClient httpClient = HttpClient.from(tcpClient);
 
 223         ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
 
 224         ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
 
 225                 .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
 
 227         return WebClient.builder() //
 
 228                 .clientConnector(connector) //
 
 230                 .exchangeStrategies(exchangeStrategies) //
 
 234     private Mono<WebClient> getWebClient() {
 
 235         if (this.webClient == null) {
 
 237                 TcpClient tcpClient = createTcpClient();
 
 238                 this.webClient = createWebClient(this.baseUrl, tcpClient);
 
 239             } catch (Exception e) {
 
 240                 logger.error("Could not create WebClient {}", e.getMessage());
 
 241                 return Mono.error(e);
 
 244         return Mono.just(this.webClient);