2  * ========================LICENSE_START=================================
 
   4  * ======================================================================
 
   5  * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
 
   6  * Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
 
   7  * ======================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ========================LICENSE_END===================================
 
  22 package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
  24 import io.netty.channel.ChannelOption;
 
  25 import io.netty.handler.ssl.SslContext;
 
  26 import io.netty.handler.timeout.ReadTimeoutHandler;
 
  27 import io.netty.handler.timeout.WriteTimeoutHandler;
 
  28 import io.opentelemetry.instrumentation.spring.webflux.v5_3.SpringWebfluxTelemetry;
 
  30 import java.lang.invoke.MethodHandles;
 
  31 import java.util.concurrent.atomic.AtomicInteger;
 
  33 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationContextProvider;
 
  34 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.OtelConfig;
 
  35 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig.HttpProxyConfig;
 
  36 import org.slf4j.Logger;
 
  37 import org.slf4j.LoggerFactory;
 
  38 import org.springframework.http.MediaType;
 
  39 import org.springframework.http.ResponseEntity;
 
  40 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 
  41 import org.springframework.lang.Nullable;
 
  42 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 
  43 import org.springframework.web.reactive.function.client.ExchangeStrategies;
 
  44 import org.springframework.web.reactive.function.client.WebClient;
 
  45 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
 
  46 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
  48 import reactor.core.publisher.Mono;
 
  49 import reactor.netty.http.client.HttpClient;
 
  50 import reactor.netty.transport.ProxyProvider;
 
  53  * Generic reactive REST client.
 
  55 public class AsyncRestClient {
 
  57     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
  58     private WebClient webClient = null;
 
  59     private final String baseUrl;
 
  60     private static final AtomicInteger sequenceNumber = new AtomicInteger();
 
  61     private final SslContext sslContext;
 
  62     private final HttpProxyConfig httpProxyConfig;
 
  63     private final SecurityContext securityContext;
 
  64     private OtelConfig otelConfig = ApplicationContextProvider.getApplicationContext().getBean(OtelConfig.class);
 
  66     public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig,
 
  67             SecurityContext securityContext) {
 
  68         this.baseUrl = baseUrl;
 
  69         this.sslContext = sslContext;
 
  70         this.httpProxyConfig = httpProxyConfig;
 
  71         this.securityContext = securityContext;
 
  74     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
 
  75         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
 
  77         RequestHeadersSpec<?> request = getWebClient() //
 
  80                 .contentType(MediaType.APPLICATION_JSON) //
 
  81                 .body(bodyProducer, String.class);
 
  82         return retrieve(request);
 
  85     public Mono<String> post(String uri, @Nullable String body) {
 
  86         return postForEntity(uri, body) //
 
  90     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
 
  91         RequestHeadersSpec<?> request = getWebClient() //
 
  94                 .headers(headers -> headers.setBasicAuth(username, password)) //
 
  95                 .contentType(MediaType.APPLICATION_JSON) //
 
  97         return retrieve(request) //
 
 101     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
 
 102         RequestHeadersSpec<?> request = getWebClient() //
 
 105                 .contentType(MediaType.APPLICATION_JSON) //
 
 107         return retrieve(request);
 
 110     public Mono<ResponseEntity<String>> putForEntity(String uri) {
 
 111         RequestHeadersSpec<?> request = getWebClient() //
 
 114         return retrieve(request);
 
 117     public Mono<String> put(String uri, String body) {
 
 118         return putForEntity(uri, body) //
 
 122     public Mono<ResponseEntity<String>> getForEntity(String uri) {
 
 123         RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
 
 124         return retrieve(request);
 
 127     public Mono<String> get(String uri) {
 
 128         return getForEntity(uri) //
 
 132     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
 
 133         RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
 
 134         return retrieve(request);
 
 137     public Mono<String> delete(String uri) {
 
 138         return deleteForEntity(uri) //
 
 142     private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
 
 143         if (securityContext.isConfigured()) {
 
 144             request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
 
 146         return request.retrieve() //
 
 147                 .toEntity(String.class) //
 
 148                 .doOnError(this::onError);
 
 152     private void onError(Throwable t) {
 
 153         if (t instanceof WebClientResponseException) {
 
 154             WebClientResponseException e = (WebClientResponseException) t;
 
 155             logger.debug("Response error: {}", e.getResponseBodyAsString());
 
 159     private static Object createTraceTag() {
 
 160         return sequenceNumber.incrementAndGet();
 
 163     private String toBody(ResponseEntity<String> entity) {
 
 164         if (entity.getBody() == null) {
 
 167             return entity.getBody();
 
 171     private boolean isHttpProxyConfigured() {
 
 172         return httpProxyConfig != null && httpProxyConfig.getHttpProxyPort() > 0
 
 173                 && !httpProxyConfig.getHttpProxyHost().isEmpty();
 
 176     private HttpClient buildHttpClient() {
 
 177         HttpClient httpClient = HttpClient.create() //
 
 178                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
 
 179                 .doOnConnected(connection -> {
 
 180                     connection.addHandlerLast(new ReadTimeoutHandler(30));
 
 181                     connection.addHandlerLast(new WriteTimeoutHandler(30));
 
 184         if (this.sslContext != null) {
 
 185             httpClient = httpClient.secure(ssl -> ssl.sslContext(sslContext));
 
 188         if (isHttpProxyConfigured()) {
 
 189             httpClient = httpClient.proxy(proxy -> proxy.type(ProxyProvider.Proxy.HTTP)
 
 190                     .host(httpProxyConfig.getHttpProxyHost()).port(httpProxyConfig.getHttpProxyPort()));
 
 195     public WebClient buildWebClient(String baseUrl) {
 
 196         Object traceTag = createTraceTag();
 
 198         final HttpClient httpClient = buildHttpClient();
 
 199         ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
 
 200                 .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
 
 203         ExchangeFilterFunction reqLogger = ExchangeFilterFunction.ofRequestProcessor(req -> {
 
 204             logger.debug("{} {} uri = '{}''", traceTag, req.method(), req.url());
 
 205             return Mono.just(req);
 
 208         ExchangeFilterFunction respLogger = ExchangeFilterFunction.ofResponseProcessor(resp -> {
 
 209             logger.debug("{} resp: {}", traceTag, resp.statusCode());
 
 210             return Mono.just(resp);
 
 213         WebClient.Builder webClientBuilder = WebClient.builder()
 
 214                 .clientConnector(new ReactorClientHttpConnector(httpClient))
 
 216                 .exchangeStrategies(exchangeStrategies)
 
 220         if (otelConfig.isTracingEnabled()) {
 
 221             SpringWebfluxTelemetry webfluxTelemetry = ApplicationContextProvider.getApplicationContext().getBean(SpringWebfluxTelemetry.class);
 
 222             webClientBuilder.filters(webfluxTelemetry::addClientTracingFilter);
 
 225         return webClientBuilder.build();
 
 228     private WebClient getWebClient() {
 
 229         if (this.webClient == null) {
 
 230             this.webClient = buildWebClient(baseUrl);
 
 232         return this.webClient;