2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
33 import reactor.core.publisher.Mono;
34 import reactor.test.StepVerifier;
36 import java.net.ConnectException;
37 import java.net.MalformedURLException;
39 import java.time.Duration;
40 import java.util.concurrent.atomic.AtomicInteger;
42 import static org.assertj.core.api.Assertions.assertThat;
43 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
44 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
46 class RxHttpClientIT {
48 private static final Duration TIMEOUT = Duration.ofHours(5);
49 private static final Duration NO_DELAY = Duration.ofSeconds(0);
50 private static final int RETRY_COUNT = 1;
51 private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
52 private static final DummyHttpServer HTTP_SERVER = initialize();
53 private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
54 private static final Mono<String> OK = Mono.just("OK");
55 private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
56 private static AtomicInteger REQUEST_COUNTER;
58 private static DummyHttpServer initialize() {
59 return DummyHttpServer.start(routes -> routes
60 .get("/sample-get", (req, resp) -> sendString(resp, OK))
61 .get("/delay-get", (req, resp) ->
62 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
63 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
64 .get("/retry-get-500", (req, resp) ->
65 sendInOrderWithDelay(REQUEST_COUNTER,
66 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
67 .get("/retry-get-400", (req, resp) ->
68 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
69 .get("/retry-get-500-200", (req, resp) ->
70 sendInOrderWithDelay(REQUEST_COUNTER,
71 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
72 .get("/retry-get-200", (req, resp) ->
73 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
74 .post("/headers-post", (req, resp) -> resp
75 .sendString(Mono.just(req.requestHeaders().toString())))
76 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
81 static void tearDownClass() {
86 void simpleGet() throws Exception {
88 final HttpRequest httpRequest = requestFor("/sample-get")
89 .method(HttpMethod.GET)
91 final RxHttpClient cut = RxHttpClientFactory.create();
94 final Mono<String> bodyAsString = cut.call(httpRequest)
95 .doOnNext(HttpResponse::throwIfUnsuccessful)
96 .map(HttpResponse::bodyAsString);
99 StepVerifier.create(bodyAsString)
106 void getWithError() throws Exception {
108 final HttpRequest httpRequest = requestFor("/sample-get-500")
109 .method(HttpMethod.GET)
111 final RxHttpClient cut = RxHttpClientFactory.create();
114 final Mono<String> bodyAsString = cut.call(httpRequest)
115 .doOnNext(HttpResponse::throwIfUnsuccessful)
116 .map(HttpResponse::bodyAsString);
119 StepVerifier.create(bodyAsString)
120 .expectError(HttpException.class)
125 void simplePost() throws Exception {
127 final String requestBody = "hello world";
128 final HttpRequest httpRequest = requestFor("/echo-post")
129 .method(HttpMethod.POST)
130 .body(RequestBody.fromString(requestBody))
132 final RxHttpClient cut = RxHttpClientFactory.create();
135 final Mono<String> bodyAsString = cut.call(httpRequest)
136 .doOnNext(HttpResponse::throwIfUnsuccessful)
137 .map(HttpResponse::bodyAsString);
140 StepVerifier.create(bodyAsString)
141 .expectNext(requestBody)
147 void testChunkedEncoding() throws Exception {
149 final String requestBody = "hello world";
150 final HttpRequest httpRequest = requestFor("/headers-post")
151 .method(HttpMethod.POST)
152 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
154 final RxHttpClient cut = RxHttpClientFactory.create();
157 final Mono<String> bodyAsString = cut.call(httpRequest)
158 .doOnNext(HttpResponse::throwIfUnsuccessful)
159 .map(HttpResponse::bodyAsString);
162 StepVerifier.create(bodyAsString.map(String::toLowerCase))
163 .consumeNextWith(responseBody -> {
164 assertThat(responseBody).contains("transfer-encoding: chunked");
165 assertThat(responseBody).doesNotContain("content-length");
172 void testUnchunkedEncoding() throws Exception {
174 final String requestBody = "hello world";
175 final HttpRequest httpRequest = requestFor("/headers-post")
176 .method(HttpMethod.POST)
177 .body(RequestBody.fromString(requestBody))
179 final RxHttpClient cut = RxHttpClientFactory.create();
182 final Mono<String> bodyAsString = cut.call(httpRequest)
183 .doOnNext(HttpResponse::throwIfUnsuccessful)
184 .map(HttpResponse::bodyAsString);
187 StepVerifier.create(bodyAsString.map(String::toLowerCase))
188 .consumeNextWith(responseBody -> {
189 assertThat(responseBody).doesNotContain("transfer-encoding");
190 assertThat(responseBody).contains("content-length");
197 void getWithTimeoutError() throws Exception {
199 REQUEST_COUNTER = new AtomicInteger();
200 final HttpRequest httpRequest = requestFor("/delay-get")
201 .method(HttpMethod.GET)
202 .timeout(Duration.ofMillis(1))
204 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
207 final Mono<HttpResponse> response = cut.call(httpRequest);
210 StepVerifier.create(response)
211 .expectError(ReadTimeoutException.class)
213 assertNoServerResponse();
217 void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
219 REQUEST_COUNTER = new AtomicInteger();
220 final HttpRequest httpRequest = requestForClosedServer("/sample-get")
221 .method(HttpMethod.GET)
223 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
224 .retryConfig(defaultRetryConfig()
225 .customRetryableExceptions(HashSet.of(ConnectException.class))
230 final Mono<HttpResponse> response = cut.call(httpRequest);
233 StepVerifier.create(response)
234 .expectError(IllegalStateException.class)
236 assertNoServerResponse();
240 void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
242 REQUEST_COUNTER = new AtomicInteger();
243 final HttpRequest httpRequest = requestForClosedServer("/sample-get")
244 .method(HttpMethod.GET)
246 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
247 .retryConfig(defaultRetryConfig()
248 .customRetryableExceptions(HashSet.of(ConnectException.class))
249 .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
254 final Mono<HttpResponse> response = cut.call(httpRequest);
257 StepVerifier.create(response)
258 .expectError(ReadTimeoutException.class)
260 assertNoServerResponse();
264 void getWithRetryExhaustedExceptionWhen500() throws Exception {
266 REQUEST_COUNTER = new AtomicInteger();
267 final HttpRequest httpRequest = requestFor("/retry-get-500")
268 .method(HttpMethod.GET)
270 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
271 .retryConfig(defaultRetryConfig()
272 .retryableHttpResponseCodes(HashSet.of(500))
277 final Mono<HttpResponse> response = cut.call(httpRequest);
280 StepVerifier.create(response)
281 .expectError(IllegalStateException.class)
287 void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
289 REQUEST_COUNTER = new AtomicInteger();
290 final HttpRequest httpRequest = requestFor("/retry-get-500")
291 .method(HttpMethod.GET)
293 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
294 .retryConfig(defaultRetryConfig()
295 .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
296 .retryableHttpResponseCodes(HashSet.of(500))
301 final Mono<HttpResponse> response = cut.call(httpRequest);
304 StepVerifier.create(response)
305 .expectError(ReadTimeoutException.class)
311 void getWithRetryWhen500AndThen200() throws Exception {
313 REQUEST_COUNTER = new AtomicInteger();
314 final HttpRequest httpRequest = requestFor("/retry-get-500-200")
315 .method(HttpMethod.GET)
317 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
318 .retryConfig(defaultRetryConfig()
319 .retryableHttpResponseCodes(HashSet.of(500))
324 final Mono<String> bodyAsString = cut.call(httpRequest)
325 .doOnNext(HttpResponse::throwIfUnsuccessful)
326 .map(HttpResponse::bodyAsString);
329 StepVerifier.create(bodyAsString)
337 void getWithoutRetryWhen200() throws Exception {
339 REQUEST_COUNTER = new AtomicInteger();
340 final HttpRequest httpRequest = requestFor("/retry-get-200")
341 .method(HttpMethod.GET)
343 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
344 .retryConfig(defaultRetryConfig()
345 .retryableHttpResponseCodes(HashSet.of(500))
350 final Mono<String> bodyAsString = cut.call(httpRequest)
351 .doOnNext(HttpResponse::throwIfUnsuccessful)
352 .map(HttpResponse::bodyAsString);
355 StepVerifier.create(bodyAsString)
363 void getWithoutRetryWhen400() throws Exception {
365 REQUEST_COUNTER = new AtomicInteger();
366 final HttpRequest httpRequest = requestFor("/retry-get-400")
367 .method(HttpMethod.GET)
369 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
370 .retryConfig(defaultRetryConfig()
371 .retryableHttpResponseCodes(HashSet.of(500))
376 Mono<HttpResponse> result = cut.call(httpRequest);
379 StepVerifier.create(result)
380 .consumeNextWith(this::assert400)
386 private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
387 return ImmutableHttpRequest.builder()
388 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
391 private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
392 return ImmutableHttpRequest.builder()
393 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
396 private ImmutableRetryConfig.Builder defaultRetryConfig() {
397 return ImmutableRetryConfig.builder()
398 .retryCount(RETRY_COUNT)
399 .retryInterval(RETRY_INTERVAL);
402 private void assertRetry() {
403 assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
406 private void assertNoRetry() {
407 assertThat(REQUEST_COUNTER.get()).isOne();
410 private void assertNoServerResponse() {
411 assertThat(REQUEST_COUNTER.get()).isZero();
414 private void assert400(HttpResponse httpResponse) {
415 assertThat(httpResponse.statusCode()).isEqualTo(400);