8d076e02adf41640750e03933169136aba593c7a
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
22
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
25 import io.vavr.Tuple;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
34 import reactor.core.publisher.Mono;
35 import reactor.test.StepVerifier;
36
37 import java.net.ConnectException;
38 import java.net.MalformedURLException;
39 import java.net.URL;
40 import java.time.Duration;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 import static org.assertj.core.api.Assertions.assertThat;
44 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
45 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
46
47 class RxHttpClientIT {
48
49     private static final Duration TIMEOUT = Duration.ofHours(5);
50     private static final Duration NO_DELAY = Duration.ofSeconds(0);
51     private static final int RETRY_COUNT = 1;
52     private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
53     private static final DummyHttpServer HTTP_SERVER = initialize();
54     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
55     private static final Mono<String> OK = Mono.just("OK");
56     private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
57     private static AtomicInteger REQUEST_COUNTER;
58
59     private static DummyHttpServer initialize() {
60         return DummyHttpServer.start(routes -> routes
61                 .get("/sample-get", (req, resp) -> sendString(resp, OK))
62                 .get("/delay-get", (req, resp) ->
63                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
64                 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
65                 .get("/retry-get-500", (req, resp) ->
66                         sendInOrderWithDelay(REQUEST_COUNTER,
67                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
68                 .get("/retry-get-400", (req, resp) ->
69                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
70                 .get("/retry-get-500-200", (req, resp) ->
71                         sendInOrderWithDelay(REQUEST_COUNTER,
72                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
73                 .get("/retry-get-200", (req, resp) ->
74                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
75                 .post("/headers-post", (req, resp) -> resp
76                         .sendString(Mono.just(req.requestHeaders().toString())))
77                 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
78         );
79     }
80
81     @AfterAll
82     static void tearDownClass() {
83         HTTP_SERVER.close();
84     }
85
86     @Test
87     void simpleGet() throws Exception {
88         // given
89         final HttpRequest httpRequest = requestFor("/sample-get")
90                 .method(HttpMethod.GET)
91                 .build();
92         final RxHttpClient cut = RxHttpClientFactory.create();
93
94         // when
95         final Mono<String> bodyAsString = cut.call(httpRequest)
96                 .doOnNext(HttpResponse::throwIfUnsuccessful)
97                 .map(HttpResponse::bodyAsString);
98
99         // then
100         StepVerifier.create(bodyAsString)
101                 .expectNext("OK")
102                 .expectComplete()
103                 .verify(TIMEOUT);
104     }
105
106     @Test
107     void getWithError() throws Exception {
108         // given
109         final HttpRequest httpRequest = requestFor("/sample-get-500")
110                 .method(HttpMethod.GET)
111                 .build();
112         final RxHttpClient cut = RxHttpClientFactory.create();
113
114         // when
115         final Mono<String> bodyAsString = cut.call(httpRequest)
116                 .doOnNext(HttpResponse::throwIfUnsuccessful)
117                 .map(HttpResponse::bodyAsString);
118
119         // then
120         StepVerifier.create(bodyAsString)
121                 .expectError(HttpException.class)
122                 .verify(TIMEOUT);
123     }
124
125     @Test
126     void simplePost() throws Exception {
127         // given
128         final String requestBody = "hello world";
129         final HttpRequest httpRequest = requestFor("/echo-post")
130                 .method(HttpMethod.POST)
131                 .body(RequestBody.fromString(requestBody))
132                 .build();
133         final RxHttpClient cut = RxHttpClientFactory.create();
134
135         // when
136         final Mono<String> bodyAsString = cut.call(httpRequest)
137                 .doOnNext(HttpResponse::throwIfUnsuccessful)
138                 .map(HttpResponse::bodyAsString);
139
140         // then
141         StepVerifier.create(bodyAsString)
142                 .expectNext(requestBody)
143                 .expectComplete()
144                 .verify(TIMEOUT);
145     }
146
147     @Test
148     void testChunkedEncoding() throws Exception {
149         // given
150         final String requestBody = "hello world";
151         final HttpRequest httpRequest = requestFor("/headers-post")
152                 .method(HttpMethod.POST)
153                 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
154                 .build();
155         final RxHttpClient cut = RxHttpClientFactory.create();
156
157         // when
158         final Mono<String> bodyAsString = cut.call(httpRequest)
159                 .doOnNext(HttpResponse::throwIfUnsuccessful)
160                 .map(HttpResponse::bodyAsString);
161
162         // then
163         StepVerifier.create(bodyAsString.map(String::toLowerCase))
164                 .consumeNextWith(responseBody -> {
165                     assertThat(responseBody).contains("transfer-encoding: chunked");
166                     assertThat(responseBody).doesNotContain("content-length");
167                 })
168                 .expectComplete()
169                 .verify(TIMEOUT);
170     }
171
172     @Test
173     void testUnchunkedEncoding() throws Exception {
174         // given
175         final String requestBody = "hello world";
176         final HttpRequest httpRequest = requestFor("/headers-post")
177                 .method(HttpMethod.POST)
178                 .body(RequestBody.fromString(requestBody))
179                 .build();
180         final RxHttpClient cut = RxHttpClientFactory.create();
181
182         // when
183         final Mono<String> bodyAsString = cut.call(httpRequest)
184                 .doOnNext(HttpResponse::throwIfUnsuccessful)
185                 .map(HttpResponse::bodyAsString);
186
187         // then
188         StepVerifier.create(bodyAsString.map(String::toLowerCase))
189                 .consumeNextWith(responseBody -> {
190                     assertThat(responseBody).doesNotContain("transfer-encoding");
191                     assertThat(responseBody).contains("content-length");
192                 })
193                 .expectComplete()
194                 .verify(TIMEOUT);
195     }
196
197     @Test
198     void getWithTimeoutError() throws Exception {
199         // given
200         REQUEST_COUNTER = new AtomicInteger();
201         final HttpRequest httpRequest = requestFor("/delay-get")
202                 .method(HttpMethod.GET)
203                 .timeout(Duration.ofMillis(1))
204                 .build();
205         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
206
207         // when
208         final Mono<HttpResponse> response = cut.call(httpRequest);
209
210         // then
211         StepVerifier.create(response)
212                 .expectError(ReadTimeoutException.class)
213                 .verify(TIMEOUT);
214         assertNoServerResponse();
215     }
216
217     @Test
218     void getWithConnectExceptionWhenClosedServer() throws Exception {
219         // given
220         REQUEST_COUNTER = new AtomicInteger();
221         final HttpRequest httpRequest = requestForClosedServer("/sample-get")
222                 .method(HttpMethod.GET)
223                 .build();
224         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
225                 .retryConfig(defaultRetryConfig()
226                         .customRetryableExceptions(HashSet.of(ConnectException.class))
227                         .build())
228                 .build());
229
230         // when
231         final Mono<HttpResponse> response = cut.call(httpRequest);
232
233         // then
234         StepVerifier.create(response)
235                 .expectError(ConnectException.class)
236                 .verify(TIMEOUT);
237         assertNoServerResponse();
238     }
239
240     @Test
241     void getWithRetryableExceptionWhen500() throws Exception {
242         // given
243         REQUEST_COUNTER = new AtomicInteger();
244         final HttpRequest httpRequest = requestFor("/retry-get-500")
245                 .method(HttpMethod.GET)
246                 .build();
247         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
248                 .retryConfig(defaultRetryConfig()
249                         .retryableHttpResponseCodes(HashSet.of(500))
250                         .build())
251                 .build());
252
253         // when
254         final Mono<HttpResponse> response = cut.call(httpRequest);
255
256         // then
257         StepVerifier.create(response)
258                 .expectError(RetryableException.class)
259                 .verify(TIMEOUT);
260         assertRetry();
261     }
262
263     @Test
264     void getWithRetryWhen500AndThen200() throws Exception {
265         // given
266         REQUEST_COUNTER = new AtomicInteger();
267         final HttpRequest httpRequest = requestFor("/retry-get-500-200")
268                 .method(HttpMethod.GET)
269                 .build();
270         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
271                 .retryConfig(defaultRetryConfig()
272                         .retryableHttpResponseCodes(HashSet.of(500))
273                         .build())
274                 .build());
275
276         // when
277         final Mono<String> bodyAsString = cut.call(httpRequest)
278                 .doOnNext(HttpResponse::throwIfUnsuccessful)
279                 .map(HttpResponse::bodyAsString);
280
281         // then
282         StepVerifier.create(bodyAsString)
283                 .expectNext("OK")
284                 .expectComplete()
285                 .verify(TIMEOUT);
286         assertRetry();
287     }
288
289     @Test
290     void getWithoutRetryWhen200() throws Exception {
291         // given
292         REQUEST_COUNTER = new AtomicInteger();
293         final HttpRequest httpRequest = requestFor("/retry-get-200")
294                 .method(HttpMethod.GET)
295                 .build();
296         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
297                 .retryConfig(defaultRetryConfig()
298                         .retryableHttpResponseCodes(HashSet.of(500))
299                         .build())
300                 .build());
301
302         // when
303         final Mono<String> bodyAsString = cut.call(httpRequest)
304                 .doOnNext(HttpResponse::throwIfUnsuccessful)
305                 .map(HttpResponse::bodyAsString);
306
307         // then
308         StepVerifier.create(bodyAsString)
309                 .expectNext("OK")
310                 .expectComplete()
311                 .verify(TIMEOUT);
312         assertNoRetry();
313     }
314
315     @Test
316     void getWithoutRetryWhen400() throws Exception {
317         // given
318         REQUEST_COUNTER = new AtomicInteger();
319         final HttpRequest httpRequest = requestFor("/retry-get-400")
320                 .method(HttpMethod.GET)
321                 .build();
322         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
323                 .retryConfig(defaultRetryConfig()
324                         .retryableHttpResponseCodes(HashSet.of(500))
325                         .build())
326                 .build());
327
328         // when
329         Mono<HttpResponse> result = cut.call(httpRequest);
330
331         // then
332         StepVerifier.create(result)
333                 .consumeNextWith(this::assert400)
334                 .expectComplete()
335                 .verify(TIMEOUT);
336         assertNoRetry();
337     }
338
339     private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
340         return ImmutableHttpRequest.builder()
341                 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
342     }
343
344     private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
345         return ImmutableHttpRequest.builder()
346                 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
347     }
348
349     private ImmutableRetryConfig.Builder defaultRetryConfig() {
350         return ImmutableRetryConfig.builder()
351                 .retryCount(RETRY_COUNT)
352                 .retryInterval(RETRY_INTERVAL);
353     }
354
355     private void assertRetry() {
356         assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
357     }
358
359     private void assertNoRetry() {
360         assertThat(REQUEST_COUNTER.get()).isOne();
361     }
362
363     private void assertNoServerResponse() {
364         assertThat(REQUEST_COUNTER.get()).isZero();
365     }
366     
367     private void assert400(HttpResponse httpResponse) {
368         assertThat(httpResponse.statusCode()).isEqualTo(400);
369     }
370 }