2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableConnectionPoolConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
35 import reactor.core.publisher.Mono;
36 import reactor.test.StepVerifier;
38 import java.net.ConnectException;
39 import java.net.MalformedURLException;
41 import java.time.Duration;
42 import java.util.concurrent.atomic.AtomicInteger;
44 import static org.assertj.core.api.Assertions.assertThat;
45 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
46 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
48 class RxHttpClientIT {
50 private static final Duration TIMEOUT = Duration.ofHours(5);
51 private static final Duration NO_DELAY = Duration.ofSeconds(0);
52 private static final int RETRY_COUNT = 1;
53 private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
54 private static final DummyHttpServer HTTP_SERVER = initialize();
55 private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
56 private static final Mono<String> OK = Mono.just("OK");
57 private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
58 private static AtomicInteger REQUEST_COUNTER;
60 private static DummyHttpServer initialize() {
61 return DummyHttpServer.start(routes -> routes
62 .get("/sample-get", (req, resp) -> sendString(resp, OK))
63 .get("/delay-get", (req, resp) ->
64 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
65 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
66 .get("/retry-get-500", (req, resp) ->
67 sendInOrderWithDelay(REQUEST_COUNTER,
68 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
69 .get("/retry-get-400", (req, resp) ->
70 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
71 .get("/retry-get-500-200", (req, resp) ->
72 sendInOrderWithDelay(REQUEST_COUNTER,
73 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
74 .get("/retry-get-200", (req, resp) ->
75 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
76 .post("/headers-post", (req, resp) -> resp
77 .sendString(Mono.just(req.requestHeaders().toString())))
78 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
83 static void tearDownClass() {
88 void simpleGet() throws Exception {
90 final HttpRequest httpRequest = requestFor("/sample-get")
91 .method(HttpMethod.GET)
93 final RxHttpClient cut = RxHttpClientFactory.create();
96 final Mono<String> bodyAsString = cut.call(httpRequest)
97 .doOnNext(HttpResponse::throwIfUnsuccessful)
98 .map(HttpResponse::bodyAsString);
101 StepVerifier.create(bodyAsString)
108 void getWithError() throws Exception {
110 final HttpRequest httpRequest = requestFor("/sample-get-500")
111 .method(HttpMethod.GET)
113 final RxHttpClient cut = RxHttpClientFactory.create();
116 final Mono<String> bodyAsString = cut.call(httpRequest)
117 .doOnNext(HttpResponse::throwIfUnsuccessful)
118 .map(HttpResponse::bodyAsString);
121 StepVerifier.create(bodyAsString)
122 .expectError(HttpException.class)
127 void simplePost() throws Exception {
129 final String requestBody = "hello world";
130 final HttpRequest httpRequest = requestFor("/echo-post")
131 .method(HttpMethod.POST)
132 .body(RequestBody.fromString(requestBody))
134 final RxHttpClient cut = RxHttpClientFactory.create();
137 final Mono<String> bodyAsString = cut.call(httpRequest)
138 .doOnNext(HttpResponse::throwIfUnsuccessful)
139 .map(HttpResponse::bodyAsString);
142 StepVerifier.create(bodyAsString)
143 .expectNext(requestBody)
149 void testChunkedEncoding() throws Exception {
151 final String requestBody = "hello world";
152 final HttpRequest httpRequest = requestFor("/headers-post")
153 .method(HttpMethod.POST)
154 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
156 final RxHttpClient cut = RxHttpClientFactory.create();
159 final Mono<String> bodyAsString = cut.call(httpRequest)
160 .doOnNext(HttpResponse::throwIfUnsuccessful)
161 .map(HttpResponse::bodyAsString);
164 StepVerifier.create(bodyAsString.map(String::toLowerCase))
165 .consumeNextWith(responseBody -> {
166 assertThat(responseBody).contains("transfer-encoding: chunked");
167 assertThat(responseBody).doesNotContain("content-length");
174 void testUnchunkedEncoding() throws Exception {
176 final String requestBody = "hello world";
177 final HttpRequest httpRequest = requestFor("/headers-post")
178 .method(HttpMethod.POST)
179 .body(RequestBody.fromString(requestBody))
181 final RxHttpClient cut = RxHttpClientFactory.create();
184 final Mono<String> bodyAsString = cut.call(httpRequest)
185 .doOnNext(HttpResponse::throwIfUnsuccessful)
186 .map(HttpResponse::bodyAsString);
189 StepVerifier.create(bodyAsString.map(String::toLowerCase))
190 .consumeNextWith(responseBody -> {
191 assertThat(responseBody).doesNotContain("transfer-encoding");
192 assertThat(responseBody).contains("content-length");
199 void getWithTimeoutError() throws Exception {
201 REQUEST_COUNTER = new AtomicInteger();
202 final HttpRequest httpRequest = requestFor("/delay-get")
203 .method(HttpMethod.GET)
204 .timeout(Duration.ofMillis(1))
206 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
209 final Mono<HttpResponse> response = cut.call(httpRequest);
212 StepVerifier.create(response)
213 .expectError(ReadTimeoutException.class)
215 assertNoServerResponse();
219 void getWithConnectExceptionWhenClosedServer() throws Exception {
221 REQUEST_COUNTER = new AtomicInteger();
222 final HttpRequest httpRequest = requestForClosedServer("/sample-get")
223 .method(HttpMethod.GET)
225 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
226 .retryConfig(defaultRetryConfig()
227 .customRetryableExceptions(HashSet.of(ConnectException.class))
232 final Mono<HttpResponse> response = cut.call(httpRequest);
235 StepVerifier.create(response)
236 .expectError(ConnectException.class)
238 assertNoServerResponse();
242 void getWithRetryableExceptionWhen500() throws Exception {
244 REQUEST_COUNTER = new AtomicInteger();
245 final HttpRequest httpRequest = requestFor("/retry-get-500")
246 .method(HttpMethod.GET)
248 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
249 .retryConfig(defaultRetryConfig()
250 .retryableHttpResponseCodes(HashSet.of(500))
255 final Mono<HttpResponse> response = cut.call(httpRequest);
258 StepVerifier.create(response)
259 .expectError(RetryableException.class)
265 void getWithRetryWhen500AndThen200() throws Exception {
267 REQUEST_COUNTER = new AtomicInteger();
268 final HttpRequest httpRequest = requestFor("/retry-get-500-200")
269 .method(HttpMethod.GET)
271 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
272 .retryConfig(defaultRetryConfig()
273 .retryableHttpResponseCodes(HashSet.of(500))
278 final Mono<String> bodyAsString = cut.call(httpRequest)
279 .doOnNext(HttpResponse::throwIfUnsuccessful)
280 .map(HttpResponse::bodyAsString);
283 StepVerifier.create(bodyAsString)
291 void getWithoutRetryWhen200() throws Exception {
293 REQUEST_COUNTER = new AtomicInteger();
294 final HttpRequest httpRequest = requestFor("/retry-get-200")
295 .method(HttpMethod.GET)
297 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
298 .retryConfig(defaultRetryConfig()
299 .retryableHttpResponseCodes(HashSet.of(500))
304 final Mono<String> bodyAsString = cut.call(httpRequest)
305 .doOnNext(HttpResponse::throwIfUnsuccessful)
306 .map(HttpResponse::bodyAsString);
309 StepVerifier.create(bodyAsString)
317 void getWithoutRetryWhen400() throws Exception {
319 REQUEST_COUNTER = new AtomicInteger();
320 final HttpRequest httpRequest = requestFor("/retry-get-400")
321 .method(HttpMethod.GET)
323 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
324 .retryConfig(defaultRetryConfig()
325 .retryableHttpResponseCodes(HashSet.of(500))
330 Mono<HttpResponse> result = cut.call(httpRequest);
333 StepVerifier.create(result)
334 .consumeNextWith(this::assert400)
341 void simpleGetWithCustomConnectionPool() throws Exception {
343 final HttpRequest httpRequest = requestFor("/sample-get")
344 .method(HttpMethod.GET)
346 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
347 .connectionPool(defaultConnectionPoolConfig())
351 final Mono<String> bodyAsString = cut.call(httpRequest)
352 .doOnNext(HttpResponse::throwIfUnsuccessful)
353 .map(HttpResponse::bodyAsString);
356 StepVerifier.create(bodyAsString)
362 private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
363 return ImmutableHttpRequest.builder()
364 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
367 private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
368 return ImmutableHttpRequest.builder()
369 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
372 private ImmutableRetryConfig.Builder defaultRetryConfig() {
373 return ImmutableRetryConfig.builder()
374 .retryCount(RETRY_COUNT)
375 .retryInterval(RETRY_INTERVAL);
378 private void assertRetry() {
379 assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
382 private void assertNoRetry() {
383 assertThat(REQUEST_COUNTER.get()).isOne();
386 private void assertNoServerResponse() {
387 assertThat(REQUEST_COUNTER.get()).isZero();
390 private void assert400(HttpResponse httpResponse) {
391 assertThat(httpResponse.statusCode()).isEqualTo(400);
394 private ImmutableConnectionPoolConfig defaultConnectionPoolConfig(){
395 return ImmutableConnectionPoolConfig
398 .maxIdleTime(Duration.ofSeconds(5))
399 .maxLifeTime(Duration.ofSeconds(10))