Create an evolution of HTTP Client 02/82802/7
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 20 Mar 2019 13:49:13 +0000 (14:49 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 22 Mar 2019 08:04:14 +0000 (09:04 +0100)
* simplify the API
* use new http client in old one for compatibility
* deprecate old one

Issue-ID: DCAEGEN2-1010
Change-Id: Ief681ba536a37b29c10d133c61a1326a003ed308
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
17 files changed:
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java [new file with mode: 0644]
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java with 97% similarity]
rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java
rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java [new file with mode: 0644]

index 989bd2d..379daf9 100644 (file)
@@ -20,7 +20,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 
 import org.jetbrains.annotations.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
@@ -53,7 +53,7 @@ public class CbsClientFactory {
      */
     public static @NotNull Mono<CbsClient> createCbsClient(EnvProperties env) {
         return Mono.defer(() -> {
-            final CloudHttpClient httpClient = new CloudHttpClient();
+            final RxHttpClient httpClient = RxHttpClient.create();
             final CbsLookup lookup = new CbsLookup(httpClient);
             return lookup.lookup(env)
                     .map(addr -> CbsClientImpl.create(httpClient, addr, env.appName()));
index 9be08e3..72c1b26 100644 (file)
@@ -24,22 +24,24 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import org.jetbrains.annotations.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Mono;
 
 public class CbsClientImpl implements CbsClient {
 
-    private final CloudHttpClient httpClient;
+    private final RxHttpClient httpClient;
     private final String fetchUrl;
 
-    CbsClientImpl(CloudHttpClient httpClient, URL fetchUrl) {
+    CbsClientImpl(RxHttpClient httpClient, URL fetchUrl) {
         this.httpClient = httpClient;
         this.fetchUrl = fetchUrl.toString();
     }
 
-    public static CbsClientImpl create(CloudHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) {
+    public static CbsClientImpl create(RxHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) {
         return new CbsClientImpl(httpClient, constructUrl(cbsAddress, serviceName));
     }
 
@@ -57,6 +59,14 @@ public class CbsClientImpl implements CbsClient {
 
     @Override
     public @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext) {
-        return Mono.defer(() -> httpClient.get(fetchUrl, diagnosticContext, JsonObject.class));
+        return Mono.defer(() -> {
+            final ImmutableHttpRequest request = ImmutableHttpRequest.builder()
+                    .method(HttpMethod.GET)
+                    .url(fetchUrl)
+                    .diagnosticContext(diagnosticContext)
+                    .build();
+            return httpClient.call(request)
+                    .map(resp -> resp.bodyAsJson(JsonObject.class));
+        });
     }
 }
index 89daebc..3d528c3 100644 (file)
@@ -23,7 +23,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import reactor.core.publisher.Mono;
@@ -36,9 +39,9 @@ public class CbsLookup {
 
     private static final String CONSUL_JSON_SERVICE_ADDRESS = "ServiceAddress";
     private static final String CONSUL_JSON_SERVICE_PORT = "ServicePort";
-    private final CloudHttpClient httpClient;
+    private final RxHttpClient httpClient;
 
-    public CbsLookup(CloudHttpClient httpClient) {
+    public CbsLookup(RxHttpClient httpClient) {
         this.httpClient = httpClient;
     }
 
@@ -54,7 +57,13 @@ public class CbsLookup {
     }
 
     private Mono<JsonArray> fetchHttpData(String consulUrl) {
-        return httpClient.get(consulUrl, JsonArray.class);
+        return httpClient.call(
+                ImmutableHttpRequest.builder()
+                        .method(HttpMethod.GET)
+                        .url(consulUrl)
+                        .build())
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(resp -> resp.bodyAsJson(JsonArray.class));
     }
 
     private Mono<JsonObject> firstService(JsonArray services) {
index e2833fe..58e1e6c 100644 (file)
@@ -21,8 +21,8 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
 
 import com.google.gson.JsonObject;
 import io.vavr.collection.Map;
@@ -31,6 +31,7 @@ import java.time.Duration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
index 617904f..339b1ef 100644 (file)
@@ -22,7 +22,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -30,7 +29,13 @@ import static org.mockito.Mockito.verify;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.mockito.Mockito;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Mono;
 
@@ -39,7 +44,7 @@ import reactor.core.publisher.Mono;
  * @since February 2019
  */
 class CbsClientImplTest {
-    private final CloudHttpClient httpClient = mock(CloudHttpClient.class);
+    private final RxHttpClient httpClient = mock(RxHttpClient.class);
 
     @Test
     void shouldFetchUsingProperUrl() {
@@ -47,8 +52,12 @@ class CbsClientImplTest {
         InetSocketAddress cbsAddress = InetSocketAddress.createUnresolved("cbshost", 6969);
         String serviceName = "dcaegen2-ves-collector";
         final CbsClientImpl cut = CbsClientImpl.create(httpClient, cbsAddress, serviceName);
-        final JsonObject httpResponse = new JsonObject();
-        given(httpClient.get(anyString(), any(RequestDiagnosticContext.class), any(Class.class))).willReturn(Mono.just(httpResponse));
+        final HttpResponse httpResponse = ImmutableHttpResponse.builder()
+                .url("http://xxx")
+                .statusCode(200)
+                .rawBody("{}".getBytes())
+                .build();
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
         RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
 
         // when
@@ -56,7 +65,11 @@ class CbsClientImplTest {
 
         // then
         final String expectedUrl = "http://cbshost:6969/service_component/dcaegen2-ves-collector";
-        verify(httpClient).get(expectedUrl, diagnosticContext, JsonObject.class);
-        assertThat(result).isSameAs(httpResponse);
+        verify(httpClient).call(ImmutableHttpRequest.builder()
+                .method(HttpMethod.GET)
+                .url(expectedUrl)
+                .diagnosticContext(diagnosticContext)
+                .build());
+        assertThat(result.toString()).isEqualTo(httpResponse.bodyAsString());
     }
 }
\ No newline at end of file
index 94ff53f..e16605d 100644 (file)
@@ -22,8 +22,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
@@ -31,7 +34,11 @@ import com.google.gson.JsonParser;
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
@@ -49,7 +56,7 @@ class CbsLookupTest {
             .consulHost("consul.local")
             .consulPort(8050)
             .appName("whatever").build();
-    private final CloudHttpClient httpClient = mock(CloudHttpClient.class);
+    private final RxHttpClient httpClient = mock(RxHttpClient.class);
     private final CbsLookup cut = new CbsLookup(httpClient);
 
     @Test
@@ -63,6 +70,14 @@ class CbsLookupTest {
         // then
         assertThat(result.getHostString()).isEqualTo("config-binding-service");
         assertThat(result.getPort()).isEqualTo(10000);
+
+        final String url = "http://"
+                + env.consulHost()
+                + ":"
+                + env.consulPort()
+                + "/v1/catalog/service/"
+                + env.cbsName();
+        verifyHttpGetHasBeenCalled(url);
     }
 
     @Test
@@ -82,14 +97,24 @@ class CbsLookupTest {
     }
 
     private void givenConsulResponse(JsonArray jsonArray) {
-        final String url = "http://"
-                + env.consulHost()
-                + ":"
-                + env.consulPort()
-                + "/v1/catalog/service/"
-                + env.cbsName();
-        given(httpClient.get(url, JsonArray.class))
-                .willReturn(Mono.just(jsonArray));
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.just(ImmutableHttpResponse.builder()
+                        .url("http://xxx")
+                        .statusCode(200)
+                        .rawBody(jsonArray.toString().getBytes())
+                        .build()));
+    }
+
+    private void verifyHttpGetHasBeenCalled(String url) {
+        final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        assertThat(httpRequestArgumentCaptor.getValue().url())
+                .describedAs("HTTP request URL")
+                .isEqualTo(url);
+        assertThat(httpRequestArgumentCaptor.getValue().method())
+                .describedAs("HTTP request method")
+                .isEqualTo(HttpMethod.GET);
     }
 
+
 }
\ No newline at end of file
index ac790cb..e83a069 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
 import com.google.gson.Gson;
-import io.netty.handler.codec.http.HttpStatusClass;
 import io.netty.handler.ssl.SslContext;
-import io.vavr.collection.Stream;
-import java.io.IOException;
+import io.vavr.collection.HashMap;
 import java.util.Collections;
 import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel;
 import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
-import reactor.netty.ByteBufFlux;
 import reactor.netty.http.client.HttpClient;
-import reactor.netty.http.client.HttpClientRequest;
 import reactor.netty.http.client.HttpClientResponse;
 
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 11/15/18
+ * @deprecated use {@link RxHttpClient} instead
  */
-
+@Deprecated
 public class CloudHttpClient {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(CloudHttpClient.class);
     private final Gson gson = new Gson();
-    private final HttpClient httpClient;
-
-    public CloudHttpClient() {
-        this(HttpClient.create());
-    }
+    private final RxHttpClient httpClient;
 
-    public CloudHttpClient(SslContext sslContext) {
-        this(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
-    }
-
-    CloudHttpClient(HttpClient httpClient) {
+    CloudHttpClient(RxHttpClient httpClient) {
         this.httpClient = httpClient;
-
     }
 
-
-
-    @Deprecated
-    public <T> Mono<T> get(String url, RequestDiagnosticContext context, Class<T> bodyClass) {
-        return get(url, context, Collections.EMPTY_MAP, bodyClass);
+    public CloudHttpClient() {
+        this(RxHttpClient.create());
     }
 
-    @Deprecated
-    public <T> Mono<T> get(String url, RequestDiagnosticContext context, Map<String, String> customHeaders,
-        Class<T> bodyClass) {
-        final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders);
-        return callHttpGet(clientWithHeaders, url, bodyClass);
+    public CloudHttpClient(SslContext sslContext) {
+        this(RxHttpClient.create(sslContext));
     }
 
     public <T> Mono<T> get(String url, Class<T> bodyClass) {
-        return callHttpGet(httpClient, url, bodyClass);
-    }
-
-    public Mono<HttpClientResponse> post(String url, RequestDiagnosticContext context, Map<String, String> customHeaders,
-        JsonBodyBuilder jsonBodyBuilder, ClientModel clientModel) {
-        final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders);
-        return callHttpPost(clientWithHeaders, url, jsonBodyBuilder, clientModel);
+        return get(url, RequestDiagnosticContext.create(), bodyClass);
     }
 
-    public Mono<HttpClientResponse> patch(String url, RequestDiagnosticContext context, Map<String, String> customHeaders,
-        JsonBodyBuilder jsonBodyBuilder, ClientModel clientModel) {
-        final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders);
-        return callHttpPatch(clientWithHeaders, url, jsonBodyBuilder, clientModel);
-    }
-
-    private HttpClient getHttpClientWithHeaders(RequestDiagnosticContext context, Map<String, String> customHeaders) {
-        final HttpClient clientWithHeaders = httpClient
-            .doOnRequest((req, conn) -> logRequest(context, req))
-            .doOnResponse((rsp, conn) -> logResponse(context, rsp))
-            .headers(hdrs -> context.remoteCallHttpHeaders().forEach((BiConsumer<String, String>) hdrs::set))
-            .headers(hdrs -> customHeaders.forEach(hdrs::set));
-        return clientWithHeaders;
-    }
-
-    private <T> Mono<T> callHttpGet(HttpClient client, String url, Class<T> bodyClass) {
-        return client.get()
-            .uri(url)
-            .responseSingle((resp, content) -> HttpStatusClass.SUCCESS.contains(resp.status().code())
-                ? content.asString()
-                : Mono.error(createException(url, resp)))
-            .map(body -> parseJson(body, bodyClass));
-    }
-
-    private <T extends ClientModel> Mono<HttpClientResponse> callHttpPost(HttpClient client, String url,
-        JsonBodyBuilder<T> jsonBodyBuilder, T clientModel) {
-        return client.baseUrl(url).post()
-            .send(ByteBufFlux.fromString(Mono.just(jsonBodyBuilder.createJsonBody(clientModel))))
-            .responseSingle((httpClientResponse, byteBufMono) -> Mono.just(httpClientResponse));
-    }
-
-    private <T extends ClientModel> Mono<HttpClientResponse> callHttpPatch(HttpClient client, String url,
-        JsonBodyBuilder<T> jsonBodyBuilder, T clientModel) {
-        String jsonBodyRequest = jsonBodyBuilder.createJsonBody(clientModel);
-        LOGGER.debug( String.format("Json body request: %s ",jsonBodyRequest));
-        return client.baseUrl(url).patch()
-            .send(ByteBufFlux.fromString(Mono.just(jsonBodyRequest)))
-            .responseSingle((httpClientResponse, byteBufMono) -> Mono.just(httpClientResponse));
-    }
-
-    private Exception createException(String url, HttpClientResponse response) {
-        return new IOException(String.format("Request failed for URL '%s'. Response code: %s",
-            url,
-            response.status()));
-    }
-
-    private <T> T parseJson(String body, Class<T> bodyClass) {
-        return gson.fromJson(body, bodyClass);
-    }
-
-    private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) {
-        context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> {
-            LOGGER.debug("Request: {} {} {}", httpClientRequest.method(), httpClientRequest.uri(), httpClientRequest.requestHeaders());
-            if (LOGGER.isTraceEnabled()) {
-                final String headers = Stream.ofAll(httpClientRequest.requestHeaders())
-                    .map(entry -> entry.getKey() + "=" + entry.getValue())
-                    .collect(Collectors.joining("\n"));
-                LOGGER.trace(headers);
-            }
-        });
+    public <T> Mono<T> get(String url, RequestDiagnosticContext context, Class<T> bodyClass) {
+        return get(url, context, Collections.emptyMap(), bodyClass);
+    }
+
+    public <T> Mono<T> get(
+            String url,
+            RequestDiagnosticContext context,
+            Map<String, String> customHeaders,
+            Class<T> bodyClass) {
+        return httpClient.call(
+                ImmutableHttpRequest.builder()
+                        .method(HttpMethod.GET)
+                        .url(url)
+                        .customHeaders(HashMap.ofAll(customHeaders))
+                        .diagnosticContext(context)
+                        .build())
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString)
+                .map(body -> gson.fromJson(body, bodyClass));
+    }
+
+
+    public Mono<HttpClientResponse> post(
+            String url,
+            RequestDiagnosticContext context,
+            Map<String, String> customHeaders,
+            JsonBodyBuilder jsonBodyBuilder,
+            ClientModel clientModel) {
+        return callForRawResponse(url, context, customHeaders, jsonBodyBuilder, clientModel, HttpMethod.POST);
+    }
+
+    public Mono<HttpClientResponse> patch(
+            String url,
+            RequestDiagnosticContext context,
+            Map<String, String> customHeaders,
+            JsonBodyBuilder jsonBodyBuilder,
+            ClientModel clientModel) {
+        return callForRawResponse(url, context, customHeaders, jsonBodyBuilder, clientModel, HttpMethod.PATCH);
+    }
+
+
+    private Mono<HttpClientResponse> callForRawResponse(
+            String url,
+            RequestDiagnosticContext context,
+            Map<String, String> customHeaders,
+            JsonBodyBuilder jsonBodyBuilder,
+            ClientModel clientModel,
+            HttpMethod method) {
+        return httpClient.prepareRequest(
+                ImmutableHttpRequest.builder()
+                        .url(url)
+                        .customHeaders(HashMap.ofAll(customHeaders))
+                        .diagnosticContext(context)
+                        .body(RequestBody.fromString(jsonBodyBuilder.createJsonBody(clientModel)))
+                        .method(method)
+                        .build())
+                .responseSingle((httpClientResponse, byteBufMono) -> Mono.just(httpClientResponse));
     }
 
-    private void logResponse(RequestDiagnosticContext context, HttpClientResponse httpClientResponse) {
-        context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> {
-            LOGGER.debug("Response status: {}", httpClientResponse.status());
-        });
-    }
 }
 
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java
new file mode 100644 (file)
index 0000000..78e6789
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public enum HttpMethod {
+
+    CONNECT(io.netty.handler.codec.http.HttpMethod.CONNECT),
+    DELETE(io.netty.handler.codec.http.HttpMethod.DELETE),
+    GET(io.netty.handler.codec.http.HttpMethod.GET),
+    HEAD(io.netty.handler.codec.http.HttpMethod.HEAD),
+    OPTIONS(io.netty.handler.codec.http.HttpMethod.OPTIONS),
+    POST(io.netty.handler.codec.http.HttpMethod.POST),
+    PATCH(io.netty.handler.codec.http.HttpMethod.PATCH),
+    PUT(io.netty.handler.codec.http.HttpMethod.PUT),
+    TRACE(io.netty.handler.codec.http.HttpMethod.TRACE);
+
+    private final io.netty.handler.codec.http.HttpMethod nettyMethod;
+
+    HttpMethod(io.netty.handler.codec.http.HttpMethod nettyMethod) {
+        this.nettyMethod = nettyMethod;
+    }
+
+    io.netty.handler.codec.http.HttpMethod asNetty() {
+        return nettyMethod;
+    }
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java
new file mode 100644 (file)
index 0000000..7866083
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import io.netty.buffer.ByteBuf;
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import java.util.function.BiFunction;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+import reactor.netty.NettyOutbound;
+import reactor.netty.http.client.HttpClientRequest;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface HttpRequest {
+
+    String url();
+
+    HttpMethod method();
+
+    @Value.Default
+    default RequestDiagnosticContext diagnosticContext() {
+        return RequestDiagnosticContext.create();
+    }
+
+    @Value.Default
+    default Map<String, String> customHeaders() {
+        return HashMap.empty();
+    }
+
+    @Value.Default
+    default Publisher<ByteBuf> body() {
+        return Mono.empty();
+    }
+
+    @Value.Derived
+    default Map<String, String> headers() {
+        final RequestDiagnosticContext ctx = diagnosticContext();
+        return ctx == null
+                ? customHeaders()
+                : customHeaders().merge(ctx.remoteCallHttpHeaders());
+    }
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java
new file mode 100644 (file)
index 0000000..ce10047
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import com.google.gson.Gson;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface HttpResponse {
+
+    String url();
+
+    int statusCode();
+
+    byte[] rawBody();
+
+    @Value.Default
+    default String statusReason() {
+        return "";
+    }
+
+    @Value.Derived
+    default boolean successful() {
+        return statusCode() >= 200 && statusCode() < 300;
+    }
+
+    @Value.Derived
+    default String bodyAsString() {
+        return bodyAsString(StandardCharsets.UTF_8);
+    }
+
+    @Value.Derived
+    default String bodyAsString(Charset charset) {
+        return new String(rawBody(), charset);
+    }
+
+    @Value.Derived
+    default <T> T bodyAsJson(Class<T> clazz) {
+        return bodyAsJson(StandardCharsets.UTF_8, new Gson(), clazz);
+    }
+
+    @Value.Derived
+    default <T> T bodyAsJson(Charset charset, Gson gson, Class<T> clazz) {
+        return gson.fromJson(bodyAsString(charset), clazz);
+    }
+
+    default void throwIfUnsuccessful() {
+        if (!successful()) {
+            throw new HttpException(url(), statusCode(), statusReason());
+        }
+    }
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java
new file mode 100644 (file)
index 0000000..3dcd709
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpStatusClass;
+import java.nio.charset.Charset;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class NettyHttpResponse implements HttpResponse {
+
+    private final String url;
+    private final HttpResponseStatus status;
+    private final byte[] body;
+
+    NettyHttpResponse(String url, HttpResponseStatus status, byte[] body) {
+        this.url = url;
+        this.status = status;
+        this.body = body;
+    }
+
+    @Override
+    public String url() {
+        return url;
+    }
+
+    @Override
+    public boolean successful() {
+        return status.codeClass() == HttpStatusClass.SUCCESS;
+    }
+
+    @Override
+    public int statusCode() {
+        return status.code();
+    }
+
+    @Override
+    public String statusReason() {
+        return status.reasonPhrase();
+    }
+
+    @Override
+    public byte[] rawBody() {
+        return new byte[0];
+    }
+
+    @Override
+    public String bodyAsString(Charset charset) {
+        return new String(body, charset);
+    }
+
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java
new file mode 100644 (file)
index 0000000..514ea0b
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import com.google.gson.JsonElement;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+import reactor.netty.ByteBufFlux;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class RequestBody {
+
+    private RequestBody() {
+    }
+
+    public static Publisher<ByteBuf> fromString(String contents) {
+        return fromString(contents, StandardCharsets.UTF_8);
+    }
+
+    public static Publisher<ByteBuf> fromString(String contents, Charset charset) {
+        return ByteBufFlux.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT);
+    }
+
+    public static Publisher<ByteBuf> fromJson(JsonElement contents) {
+        return fromString(contents.toString());
+    }
+
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
new file mode 100644 (file)
index 0000000..f384c1c
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import io.netty.handler.ssl.SslContext;
+import io.vavr.collection.Stream;
+import java.util.stream.Collectors;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClient.ResponseReceiver;
+import reactor.netty.http.client.HttpClientRequest;
+import reactor.netty.http.client.HttpClientResponse;
+
+/**
+ * @since 1.1.4
+ */
+public class RxHttpClient {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
+    private final HttpClient httpClient;
+
+    public static RxHttpClient create() {
+        return new RxHttpClient(HttpClient.create());
+    }
+
+    // TODO: hide netty from public api (io.netty.handler.ssl.SslContext)
+    public static RxHttpClient create(SslContext sslContext) {
+        return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+    }
+
+    RxHttpClient(HttpClient httpClient) {
+        this.httpClient = httpClient;
+    }
+
+    public Mono<HttpResponse> call(HttpRequest request) {
+        return prepareRequest(request)
+                .responseSingle((resp, content) ->
+                        content.asByteArray()
+                                .defaultIfEmpty(new byte[0])
+                                .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes)));
+    }
+
+    ResponseReceiver<?> prepareRequest(HttpRequest request) {
+        return httpClient
+                .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req))
+                .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp))
+                .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2)))
+                .request(request.method().asNetty())
+                .send(request.body())
+                .uri(request.url());
+
+    }
+
+    private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) {
+        context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> {
+            LOGGER.debug("Request: {} {} {}", httpClientRequest.method(), httpClientRequest.uri(),
+                    httpClientRequest.requestHeaders());
+            if (LOGGER.isTraceEnabled()) {
+                final String headers = Stream.ofAll(httpClientRequest.requestHeaders())
+                        .map(entry -> entry.getKey() + "=" + entry.getValue())
+                        .collect(Collectors.joining("\n"));
+                LOGGER.trace(headers);
+            }
+        });
+    }
+
+    private void logResponse(RequestDiagnosticContext context, HttpClientResponse httpClientResponse) {
+        context.withSlf4jMdc(LOGGER.isDebugEnabled(),
+                () -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
+    }
+}
diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java
new file mode 100644 (file)
index 0000000..9631f4c
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public class HttpException extends RuntimeException {
+
+    private final String url;
+    private final int responseCode;
+    private final String reason;
+
+    public HttpException(String url, int responseCode, String reason) {
+        this.url = url;
+        this.responseCode = responseCode;
+        this.reason = reason;
+    }
+
+    @Override
+    public String getMessage() {
+        return String.format("Request failed for URL '%s'. Response code: %d %s",
+                url,
+                responseCode,
+                reason);
+    }
+}
index 936ef0c..a913a93 100644 (file)
@@ -42,26 +42,28 @@ import reactor.test.StepVerifier;
 import reactor.netty.http.client.HttpClientResponse;
 
 class CloudHttpClientIT {
+
     private static final int MAX_CONNECTIONS = 1;
     private static final String SAMPLE_STRING = "sampleString";
     private static final String SAMPLE_URL = "/sampleURL";
     private static final String JSON_BODY = "{\"correlationId\":\"NOKnhfsadhff\","
-        + "\"ipaddress-v4\":\"256.22.33.155\", "
-        + "\"ipaddress-v6\":\"200J:0db8:85a3:0000:0000:8a2e:0370:7334\"}";
+            + "\"ipaddress-v4\":\"256.22.33.155\", "
+            + "\"ipaddress-v6\":\"200J:0db8:85a3:0000:0000:8a2e:0370:7334\"}";
     private static final ConnectionProvider connectionProvider = ConnectionProvider.fixed("test", MAX_CONNECTIONS);
 
-    private DmaapModel dmaapModel = mock(DmaapModel.class);
-    private JsonBodyBuilder<DmaapModel> jsonBodyBuilder = mock(JsonBodyBuilder.class);
+    private final DmaapModel dmaapModel = mock(DmaapModel.class);
+    private final JsonBodyBuilder<DmaapModel> jsonBodyBuilder = mock(JsonBodyBuilder.class);
 
     @Test
     void successfulPatchResponse() {
         DisposableServer server = createValidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY);
-        Mono<HttpClientResponse> content = cloudHttpClient.patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
-            jsonBodyBuilder, dmaapModel);
+        Mono<HttpClientResponse> content = cloudHttpClient
+                .patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
+                        jsonBodyBuilder, dmaapModel);
         HttpClientResponse httpClientResponse = content.block();
 
         assertEquals(HttpResponseStatus.OK, httpClientResponse.status());
@@ -71,12 +73,13 @@ class CloudHttpClientIT {
     @Test
     void errorPatchRequest() {
         DisposableServer server = createInvalidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY);
-        Mono<HttpClientResponse> content = cloudHttpClient.patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
-            jsonBodyBuilder, dmaapModel);
+        Mono<HttpClientResponse> content = cloudHttpClient
+                .patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
+                        jsonBodyBuilder, dmaapModel);
         HttpClientResponse httpClientResponse = content.block();
 
         assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpClientResponse.status());
@@ -86,12 +89,13 @@ class CloudHttpClientIT {
     @Test
     void successfulPostResponse() {
         DisposableServer server = createValidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY);
-        Mono<HttpClientResponse> content = cloudHttpClient.post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
-            jsonBodyBuilder, dmaapModel);
+        Mono<HttpClientResponse> content = cloudHttpClient
+                .post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
+                        jsonBodyBuilder, dmaapModel);
         HttpClientResponse httpClientResponse = content.block();
 
         assertEquals(HttpResponseStatus.OK, httpClientResponse.status());
@@ -101,12 +105,13 @@ class CloudHttpClientIT {
     @Test
     void errorPostRequest() {
         DisposableServer server = createInvalidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY);
-        Mono<HttpClientResponse> content = cloudHttpClient.post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
-            jsonBodyBuilder, dmaapModel);
+        Mono<HttpClientResponse> content = cloudHttpClient
+                .post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(),
+                        jsonBodyBuilder, dmaapModel);
         HttpClientResponse httpClientResponse = content.block();
 
         assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpClientResponse.status());
@@ -116,36 +121,36 @@ class CloudHttpClientIT {
     @Test
     void successfulGetResponse() {
         DisposableServer server = createValidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY);
         Mono<String> content = cloudHttpClient.get(SAMPLE_URL, String.class);
         Mono<String> contentWithHeaders = cloudHttpClient.get(SAMPLE_URL, createRequestDiagnosticContext(),
-            createCustomHeaders(), String.class);
+                createCustomHeaders(), String.class);
 
         StepVerifier.create(content)
-            .expectNext(SAMPLE_STRING)
-            .expectComplete()
-            .verify();
+                .expectNext(SAMPLE_STRING)
+                .expectComplete()
+                .verify();
         StepVerifier.create(contentWithHeaders)
-            .expectNext(SAMPLE_STRING)
-            .expectComplete()
-            .verify();
+                .expectNext(SAMPLE_STRING)
+                .expectComplete()
+                .verify();
         server.disposeNow();
     }
 
     @Test
     void errorGetRequest() {
         DisposableServer server = createInvalidServer();
-        HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider);
+        RxHttpClient httpClient = createHttpClientForContextWithAddress(server);
         CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient);
 
         Mono<String> content = cloudHttpClient.get(SAMPLE_URL, String.class);
 
         StepVerifier.create(content)
-            .expectError()
-            .verify();
+                .expectError()
+                .verify();
         server.disposeNow();
     }
 
@@ -158,26 +163,27 @@ class CloudHttpClientIT {
     private DisposableServer createValidServer() {
         Mono<String> response = Mono.just(SAMPLE_STRING);
         return HttpServer.create()
-            .handle((req, resp) -> resp.sendString(response))
-            .wiretap(true)
-            .bindNow();
+                .handle((req, resp) -> resp.sendString(response))
+                .wiretap(true)
+                .bindNow();
     }
 
     private DisposableServer createInvalidServer() {
         return HttpServer.create()
-            .handle((req, resp) -> Mono.error(new Exception("returnError")))
-            .wiretap(true)
-            .bindNow();
+                .handle((req, resp) -> Mono.error(new Exception("returnError")))
+                .wiretap(true)
+                .bindNow();
     }
 
     private RequestDiagnosticContext createRequestDiagnosticContext() {
         return ImmutableRequestDiagnosticContext.builder()
-            .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build();
+                .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build();
     }
 
-    private HttpClient createHttpClientForContextWithAddress(DisposableServer disposableServer,
-        ConnectionProvider connectionProvider) {
-        HttpClient client = connectionProvider == null? HttpClient.create() : HttpClient.create(connectionProvider);
-        return client.addressSupplier(disposableServer::address).wiretap(true);
+    private RxHttpClient createHttpClientForContextWithAddress(DisposableServer disposableServer) {
+        HttpClient client = HttpClient.create(connectionProvider)
+                .addressSupplier(disposableServer::address)
+                .wiretap(true);
+        return new RxHttpClient(client);
     }
 }
\ No newline at end of file
diff --git a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
new file mode 100644 (file)
index 0000000..5ae62c8
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Duration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class RxHttpClientIT {
+
+    private static final Duration TIMEOUT = Duration.ofHours(5);
+    private final RxHttpClient cut = RxHttpClient.create();
+    private static DummyHttpServer httpServer;
+
+    @BeforeAll
+    static void setUpClass() {
+        httpServer = DummyHttpServer.start(routes ->
+                routes.get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
+                        .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
+                        .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
+        );
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        httpServer.close();
+    }
+
+    private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
+        return ImmutableHttpRequest.builder()
+                .url(new URL("http", httpServer.host(), httpServer.port(), path).toString());
+    }
+
+    @Test
+    void simpleGet() throws Exception {
+        // given
+        final HttpRequest httpRequest = requestFor("/sample-get").method(HttpMethod.GET).build();
+
+        // when
+        final Mono<String> bodyAsString = cut.call(httpRequest)
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString);
+
+        // then
+        StepVerifier.create(bodyAsString).expectNext("OK").expectComplete().verify(TIMEOUT);
+    }
+
+    @Test
+    void getWithError() throws Exception {
+        // given
+        final HttpRequest httpRequest = requestFor("/sample-get-500").method(HttpMethod.GET).build();
+
+        // when
+        final Mono<String> bodyAsString = cut.call(httpRequest)
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString);
+
+        // then
+        StepVerifier.create(bodyAsString).expectError(HttpException.class).verify(TIMEOUT);
+    }
+
+    @Test
+    void simplePost() throws Exception {
+        // given
+        final String requestBody = "hello world";
+        final HttpRequest httpRequest = requestFor("/echo-post")
+                .method(HttpMethod.POST)
+                .body(RequestBody.fromString(requestBody))
+                .build();
+
+        // when
+        final Mono<String> bodyAsString = cut.call(httpRequest)
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString);
+
+        // then
+        StepVerifier.create(bodyAsString)
+                .expectNext(requestBody)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+}
\ No newline at end of file