2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
34 import reactor.core.publisher.Mono;
35 import reactor.test.StepVerifier;
37 import java.net.ConnectException;
38 import java.net.MalformedURLException;
40 import java.time.Duration;
41 import java.util.concurrent.atomic.AtomicInteger;
43 import static org.assertj.core.api.Assertions.assertThat;
44 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
45 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
47 class RxHttpClientIT {
49 private static final Duration TIMEOUT = Duration.ofHours(5);
50 private static final Duration NO_DELAY = Duration.ofSeconds(0);
51 private static final int RETRY_COUNT = 1;
52 private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
53 private static final DummyHttpServer HTTP_SERVER = initialize();
54 private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
55 private static final Mono<String> OK = Mono.just("OK");
56 private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
57 private static AtomicInteger REQUEST_COUNTER;
59 private static DummyHttpServer initialize() {
60 return DummyHttpServer.start(routes -> routes
61 .get("/sample-get", (req, resp) -> sendString(resp, OK))
62 .get("/delay-get", (req, resp) ->
63 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
64 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
65 .get("/retry-get-500", (req, resp) ->
66 sendInOrderWithDelay(REQUEST_COUNTER,
67 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
68 .get("/retry-get-400", (req, resp) ->
69 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
70 .get("/retry-get-500-200", (req, resp) ->
71 sendInOrderWithDelay(REQUEST_COUNTER,
72 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
73 .get("/retry-get-200", (req, resp) ->
74 sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
75 .post("/headers-post", (req, resp) -> resp
76 .sendString(Mono.just(req.requestHeaders().toString())))
77 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
82 static void tearDownClass() {
87 void simpleGet() throws Exception {
89 final HttpRequest httpRequest = requestFor("/sample-get")
90 .method(HttpMethod.GET)
92 final RxHttpClient cut = RxHttpClientFactory.create();
95 final Mono<String> bodyAsString = cut.call(httpRequest)
96 .doOnNext(HttpResponse::throwIfUnsuccessful)
97 .map(HttpResponse::bodyAsString);
100 StepVerifier.create(bodyAsString)
107 void getWithError() throws Exception {
109 final HttpRequest httpRequest = requestFor("/sample-get-500")
110 .method(HttpMethod.GET)
112 final RxHttpClient cut = RxHttpClientFactory.create();
115 final Mono<String> bodyAsString = cut.call(httpRequest)
116 .doOnNext(HttpResponse::throwIfUnsuccessful)
117 .map(HttpResponse::bodyAsString);
120 StepVerifier.create(bodyAsString)
121 .expectError(HttpException.class)
126 void simplePost() throws Exception {
128 final String requestBody = "hello world";
129 final HttpRequest httpRequest = requestFor("/echo-post")
130 .method(HttpMethod.POST)
131 .body(RequestBody.fromString(requestBody))
133 final RxHttpClient cut = RxHttpClientFactory.create();
136 final Mono<String> bodyAsString = cut.call(httpRequest)
137 .doOnNext(HttpResponse::throwIfUnsuccessful)
138 .map(HttpResponse::bodyAsString);
141 StepVerifier.create(bodyAsString)
142 .expectNext(requestBody)
148 void testChunkedEncoding() throws Exception {
150 final String requestBody = "hello world";
151 final HttpRequest httpRequest = requestFor("/headers-post")
152 .method(HttpMethod.POST)
153 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
155 final RxHttpClient cut = RxHttpClientFactory.create();
158 final Mono<String> bodyAsString = cut.call(httpRequest)
159 .doOnNext(HttpResponse::throwIfUnsuccessful)
160 .map(HttpResponse::bodyAsString);
163 StepVerifier.create(bodyAsString.map(String::toLowerCase))
164 .consumeNextWith(responseBody -> {
165 assertThat(responseBody).contains("transfer-encoding: chunked");
166 assertThat(responseBody).doesNotContain("content-length");
173 void testUnchunkedEncoding() throws Exception {
175 final String requestBody = "hello world";
176 final HttpRequest httpRequest = requestFor("/headers-post")
177 .method(HttpMethod.POST)
178 .body(RequestBody.fromString(requestBody))
180 final RxHttpClient cut = RxHttpClientFactory.create();
183 final Mono<String> bodyAsString = cut.call(httpRequest)
184 .doOnNext(HttpResponse::throwIfUnsuccessful)
185 .map(HttpResponse::bodyAsString);
188 StepVerifier.create(bodyAsString.map(String::toLowerCase))
189 .consumeNextWith(responseBody -> {
190 assertThat(responseBody).doesNotContain("transfer-encoding");
191 assertThat(responseBody).contains("content-length");
198 void getWithTimeoutError() throws Exception {
200 REQUEST_COUNTER = new AtomicInteger();
201 final HttpRequest httpRequest = requestFor("/delay-get")
202 .method(HttpMethod.GET)
203 .timeout(Duration.ofMillis(1))
205 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
208 final Mono<HttpResponse> response = cut.call(httpRequest);
211 StepVerifier.create(response)
212 .expectError(ReadTimeoutException.class)
214 assertNoServerResponse();
218 void getWithConnectExceptionWhenClosedServer() throws Exception {
220 REQUEST_COUNTER = new AtomicInteger();
221 final HttpRequest httpRequest = requestForClosedServer("/sample-get")
222 .method(HttpMethod.GET)
224 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
225 .retryConfig(defaultRetryConfig()
226 .customRetryableExceptions(HashSet.of(ConnectException.class))
231 final Mono<HttpResponse> response = cut.call(httpRequest);
234 StepVerifier.create(response)
235 .expectError(ConnectException.class)
237 assertNoServerResponse();
241 void getWithRetryableExceptionWhen500() throws Exception {
243 REQUEST_COUNTER = new AtomicInteger();
244 final HttpRequest httpRequest = requestFor("/retry-get-500")
245 .method(HttpMethod.GET)
247 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
248 .retryConfig(defaultRetryConfig()
249 .retryableHttpResponseCodes(HashSet.of(500))
254 final Mono<HttpResponse> response = cut.call(httpRequest);
257 StepVerifier.create(response)
258 .expectError(RetryableException.class)
264 void getWithRetryWhen500AndThen200() throws Exception {
266 REQUEST_COUNTER = new AtomicInteger();
267 final HttpRequest httpRequest = requestFor("/retry-get-500-200")
268 .method(HttpMethod.GET)
270 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
271 .retryConfig(defaultRetryConfig()
272 .retryableHttpResponseCodes(HashSet.of(500))
277 final Mono<String> bodyAsString = cut.call(httpRequest)
278 .doOnNext(HttpResponse::throwIfUnsuccessful)
279 .map(HttpResponse::bodyAsString);
282 StepVerifier.create(bodyAsString)
290 void getWithoutRetryWhen200() throws Exception {
292 REQUEST_COUNTER = new AtomicInteger();
293 final HttpRequest httpRequest = requestFor("/retry-get-200")
294 .method(HttpMethod.GET)
296 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
297 .retryConfig(defaultRetryConfig()
298 .retryableHttpResponseCodes(HashSet.of(500))
303 final Mono<String> bodyAsString = cut.call(httpRequest)
304 .doOnNext(HttpResponse::throwIfUnsuccessful)
305 .map(HttpResponse::bodyAsString);
308 StepVerifier.create(bodyAsString)
316 void getWithoutRetryWhen400() throws Exception {
318 REQUEST_COUNTER = new AtomicInteger();
319 final HttpRequest httpRequest = requestFor("/retry-get-400")
320 .method(HttpMethod.GET)
322 final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
323 .retryConfig(defaultRetryConfig()
324 .retryableHttpResponseCodes(HashSet.of(500))
329 Mono<HttpResponse> result = cut.call(httpRequest);
332 StepVerifier.create(result)
333 .consumeNextWith(this::assert400)
339 private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
340 return ImmutableHttpRequest.builder()
341 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
344 private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
345 return ImmutableHttpRequest.builder()
346 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
349 private ImmutableRetryConfig.Builder defaultRetryConfig() {
350 return ImmutableRetryConfig.builder()
351 .retryCount(RETRY_COUNT)
352 .retryInterval(RETRY_INTERVAL);
355 private void assertRetry() {
356 assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
359 private void assertNoRetry() {
360 assertThat(REQUEST_COUNTER.get()).isOne();
363 private void assertNoServerResponse() {
364 assertThat(REQUEST_COUNTER.get()).isZero();
367 private void assert400(HttpResponse httpResponse) {
368 assertThat(httpResponse.statusCode()).isEqualTo(400);