Support retry in DCAE-SDK DMaaP-Client
[dcaegen2/services/sdk.git] / rest-services / http-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / adapters / http / RxHttpClientIT.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
22
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
25 import io.vavr.Tuple;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
33 import reactor.core.publisher.Mono;
34 import reactor.test.StepVerifier;
35
36 import java.net.ConnectException;
37 import java.net.MalformedURLException;
38 import java.net.URL;
39 import java.time.Duration;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import static org.assertj.core.api.Assertions.assertThat;
43 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
44 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
45
46 class RxHttpClientIT {
47
48     private static final Duration TIMEOUT = Duration.ofHours(5);
49     private static final Duration NO_DELAY = Duration.ofSeconds(0);
50     private static final int RETRY_COUNT = 1;
51     private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
52     private static final DummyHttpServer HTTP_SERVER = initialize();
53     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
54     private static final Mono<String> OK = Mono.just("OK");
55     private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
56     private static AtomicInteger REQUEST_COUNTER;
57
58     private static DummyHttpServer initialize() {
59         return DummyHttpServer.start(routes -> routes
60                 .get("/sample-get", (req, resp) -> sendString(resp, OK))
61                 .get("/delay-get", (req, resp) ->
62                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
63                 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
64                 .get("/retry-get-500", (req, resp) ->
65                         sendInOrderWithDelay(REQUEST_COUNTER,
66                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
67                 .get("/retry-get-400", (req, resp) ->
68                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
69                 .get("/retry-get-500-200", (req, resp) ->
70                         sendInOrderWithDelay(REQUEST_COUNTER,
71                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
72                 .get("/retry-get-200", (req, resp) ->
73                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
74                 .post("/headers-post", (req, resp) -> resp
75                         .sendString(Mono.just(req.requestHeaders().toString())))
76                 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
77         );
78     }
79
80     @AfterAll
81     static void tearDownClass() {
82         HTTP_SERVER.close();
83     }
84
85     @Test
86     void simpleGet() throws Exception {
87         // given
88         final HttpRequest httpRequest = requestFor("/sample-get")
89                 .method(HttpMethod.GET)
90                 .build();
91         final RxHttpClient cut = RxHttpClientFactory.create();
92
93         // when
94         final Mono<String> bodyAsString = cut.call(httpRequest)
95                 .doOnNext(HttpResponse::throwIfUnsuccessful)
96                 .map(HttpResponse::bodyAsString);
97
98         // then
99         StepVerifier.create(bodyAsString)
100                 .expectNext("OK")
101                 .expectComplete()
102                 .verify(TIMEOUT);
103     }
104
105     @Test
106     void getWithError() throws Exception {
107         // given
108         final HttpRequest httpRequest = requestFor("/sample-get-500")
109                 .method(HttpMethod.GET)
110                 .build();
111         final RxHttpClient cut = RxHttpClientFactory.create();
112
113         // when
114         final Mono<String> bodyAsString = cut.call(httpRequest)
115                 .doOnNext(HttpResponse::throwIfUnsuccessful)
116                 .map(HttpResponse::bodyAsString);
117
118         // then
119         StepVerifier.create(bodyAsString)
120                 .expectError(HttpException.class)
121                 .verify(TIMEOUT);
122     }
123
124     @Test
125     void simplePost() throws Exception {
126         // given
127         final String requestBody = "hello world";
128         final HttpRequest httpRequest = requestFor("/echo-post")
129                 .method(HttpMethod.POST)
130                 .body(RequestBody.fromString(requestBody))
131                 .build();
132         final RxHttpClient cut = RxHttpClientFactory.create();
133
134         // when
135         final Mono<String> bodyAsString = cut.call(httpRequest)
136                 .doOnNext(HttpResponse::throwIfUnsuccessful)
137                 .map(HttpResponse::bodyAsString);
138
139         // then
140         StepVerifier.create(bodyAsString)
141                 .expectNext(requestBody)
142                 .expectComplete()
143                 .verify(TIMEOUT);
144     }
145
146     @Test
147     void testChunkedEncoding() throws Exception {
148         // given
149         final String requestBody = "hello world";
150         final HttpRequest httpRequest = requestFor("/headers-post")
151                 .method(HttpMethod.POST)
152                 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
153                 .build();
154         final RxHttpClient cut = RxHttpClientFactory.create();
155
156         // when
157         final Mono<String> bodyAsString = cut.call(httpRequest)
158                 .doOnNext(HttpResponse::throwIfUnsuccessful)
159                 .map(HttpResponse::bodyAsString);
160
161         // then
162         StepVerifier.create(bodyAsString.map(String::toLowerCase))
163                 .consumeNextWith(responseBody -> {
164                     assertThat(responseBody).contains("transfer-encoding: chunked");
165                     assertThat(responseBody).doesNotContain("content-length");
166                 })
167                 .expectComplete()
168                 .verify(TIMEOUT);
169     }
170
171     @Test
172     void testUnchunkedEncoding() throws Exception {
173         // given
174         final String requestBody = "hello world";
175         final HttpRequest httpRequest = requestFor("/headers-post")
176                 .method(HttpMethod.POST)
177                 .body(RequestBody.fromString(requestBody))
178                 .build();
179         final RxHttpClient cut = RxHttpClientFactory.create();
180
181         // when
182         final Mono<String> bodyAsString = cut.call(httpRequest)
183                 .doOnNext(HttpResponse::throwIfUnsuccessful)
184                 .map(HttpResponse::bodyAsString);
185
186         // then
187         StepVerifier.create(bodyAsString.map(String::toLowerCase))
188                 .consumeNextWith(responseBody -> {
189                     assertThat(responseBody).doesNotContain("transfer-encoding");
190                     assertThat(responseBody).contains("content-length");
191                 })
192                 .expectComplete()
193                 .verify(TIMEOUT);
194     }
195
196     @Test
197     void getWithTimeoutError() throws Exception {
198         // given
199         REQUEST_COUNTER = new AtomicInteger();
200         final HttpRequest httpRequest = requestFor("/delay-get")
201                 .method(HttpMethod.GET)
202                 .timeout(Duration.ofMillis(1))
203                 .build();
204         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
205
206         // when
207         final Mono<HttpResponse> response = cut.call(httpRequest);
208
209         // then
210         StepVerifier.create(response)
211                 .expectError(ReadTimeoutException.class)
212                 .verify(TIMEOUT);
213         assertNoServerResponse();
214     }
215
216     @Test
217     void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
218         // given
219         REQUEST_COUNTER = new AtomicInteger();
220         final HttpRequest httpRequest = requestForClosedServer("/sample-get")
221                 .method(HttpMethod.GET)
222                 .build();
223         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
224                 .retryConfig(defaultRetryConfig()
225                         .customRetryableExceptions(HashSet.of(ConnectException.class))
226                         .build())
227                 .build());
228
229         // when
230         final Mono<HttpResponse> response = cut.call(httpRequest);
231
232         // then
233         StepVerifier.create(response)
234                 .expectError(IllegalStateException.class)
235                 .verify(TIMEOUT);
236         assertNoServerResponse();
237     }
238
239     @Test
240     void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
241         // given
242         REQUEST_COUNTER = new AtomicInteger();
243         final HttpRequest httpRequest = requestForClosedServer("/sample-get")
244                 .method(HttpMethod.GET)
245                 .build();
246         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
247                 .retryConfig(defaultRetryConfig()
248                         .customRetryableExceptions(HashSet.of(ConnectException.class))
249                         .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
250                         .build())
251                 .build());
252
253         // when
254         final Mono<HttpResponse> response = cut.call(httpRequest);
255
256         // then
257         StepVerifier.create(response)
258                 .expectError(ReadTimeoutException.class)
259                 .verify(TIMEOUT);
260         assertNoServerResponse();
261     }
262
263     @Test
264     void getWithRetryExhaustedExceptionWhen500() throws Exception {
265         // given
266         REQUEST_COUNTER = new AtomicInteger();
267         final HttpRequest httpRequest = requestFor("/retry-get-500")
268                 .method(HttpMethod.GET)
269                 .build();
270         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
271                 .retryConfig(defaultRetryConfig()
272                         .retryableHttpResponseCodes(HashSet.of(500))
273                         .build())
274                 .build());
275
276         // when
277         final Mono<HttpResponse> response = cut.call(httpRequest);
278
279         // then
280         StepVerifier.create(response)
281                 .expectError(IllegalStateException.class)
282                 .verify(TIMEOUT);
283         assertRetry();
284     }
285
286     @Test
287     void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
288         // given
289         REQUEST_COUNTER = new AtomicInteger();
290         final HttpRequest httpRequest = requestFor("/retry-get-500")
291                 .method(HttpMethod.GET)
292                 .build();
293         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
294                 .retryConfig(defaultRetryConfig()
295                         .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
296                         .retryableHttpResponseCodes(HashSet.of(500))
297                         .build())
298                 .build());
299
300         // when
301         final Mono<HttpResponse> response = cut.call(httpRequest);
302
303         // then
304         StepVerifier.create(response)
305                 .expectError(ReadTimeoutException.class)
306                 .verify(TIMEOUT);
307         assertRetry();
308     }
309
310     @Test
311     void getWithRetryWhen500AndThen200() throws Exception {
312         // given
313         REQUEST_COUNTER = new AtomicInteger();
314         final HttpRequest httpRequest = requestFor("/retry-get-500-200")
315                 .method(HttpMethod.GET)
316                 .build();
317         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
318                 .retryConfig(defaultRetryConfig()
319                         .retryableHttpResponseCodes(HashSet.of(500))
320                         .build())
321                 .build());
322
323         // when
324         final Mono<String> bodyAsString = cut.call(httpRequest)
325                 .doOnNext(HttpResponse::throwIfUnsuccessful)
326                 .map(HttpResponse::bodyAsString);
327
328         // then
329         StepVerifier.create(bodyAsString)
330                 .expectNext("OK")
331                 .expectComplete()
332                 .verify(TIMEOUT);
333         assertRetry();
334     }
335
336     @Test
337     void getWithoutRetryWhen200() throws Exception {
338         // given
339         REQUEST_COUNTER = new AtomicInteger();
340         final HttpRequest httpRequest = requestFor("/retry-get-200")
341                 .method(HttpMethod.GET)
342                 .build();
343         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
344                 .retryConfig(defaultRetryConfig()
345                         .retryableHttpResponseCodes(HashSet.of(500))
346                         .build())
347                 .build());
348
349         // when
350         final Mono<String> bodyAsString = cut.call(httpRequest)
351                 .doOnNext(HttpResponse::throwIfUnsuccessful)
352                 .map(HttpResponse::bodyAsString);
353
354         // then
355         StepVerifier.create(bodyAsString)
356                 .expectNext("OK")
357                 .expectComplete()
358                 .verify(TIMEOUT);
359         assertNoRetry();
360     }
361
362     @Test
363     void getWithoutRetryWhen400() throws Exception {
364         // given
365         REQUEST_COUNTER = new AtomicInteger();
366         final HttpRequest httpRequest = requestFor("/retry-get-400")
367                 .method(HttpMethod.GET)
368                 .build();
369         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
370                 .retryConfig(defaultRetryConfig()
371                         .retryableHttpResponseCodes(HashSet.of(500))
372                         .build())
373                 .build());
374
375         // when
376         Mono<HttpResponse> result = cut.call(httpRequest);
377
378         // then
379         StepVerifier.create(result)
380                 .consumeNextWith(this::assert400)
381                 .expectComplete()
382                 .verify(TIMEOUT);
383         assertNoRetry();
384     }
385
386     private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
387         return ImmutableHttpRequest.builder()
388                 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
389     }
390
391     private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
392         return ImmutableHttpRequest.builder()
393                 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
394     }
395
396     private ImmutableRetryConfig.Builder defaultRetryConfig() {
397         return ImmutableRetryConfig.builder()
398                 .retryCount(RETRY_COUNT)
399                 .retryInterval(RETRY_INTERVAL);
400     }
401
402     private void assertRetry() {
403         assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
404     }
405
406     private void assertNoRetry() {
407         assertThat(REQUEST_COUNTER.get()).isOne();
408     }
409
410     private void assertNoServerResponse() {
411         assertThat(REQUEST_COUNTER.get()).isZero();
412     }
413     
414     private void assert400(HttpResponse httpResponse) {
415         assertThat(httpResponse.statusCode()).isEqualTo(400);
416     }
417 }