ff4f0297af68e1e820f0cf67045cd42911eafa36
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
22
23 import io.netty.handler.codec.http.HttpResponseStatus;
24 import io.netty.handler.timeout.ReadTimeoutException;
25 import io.vavr.Tuple;
26 import io.vavr.collection.HashSet;
27 import org.junit.jupiter.api.AfterAll;
28 import org.junit.jupiter.api.Test;
29 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableConnectionPoolConfig;
30 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
35 import reactor.core.publisher.Mono;
36 import reactor.test.StepVerifier;
37
38 import java.net.ConnectException;
39 import java.net.MalformedURLException;
40 import java.net.URL;
41 import java.time.Duration;
42 import java.util.concurrent.atomic.AtomicInteger;
43
44 import static org.assertj.core.api.Assertions.assertThat;
45 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
46 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
47
48 class RxHttpClientIT {
49
50     private static final Duration TIMEOUT = Duration.ofHours(5);
51     private static final Duration NO_DELAY = Duration.ofSeconds(0);
52     private static final int RETRY_COUNT = 1;
53     private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
54     private static final DummyHttpServer HTTP_SERVER = initialize();
55     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
56     private static final Mono<String> OK = Mono.just("OK");
57     private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
58     private static AtomicInteger REQUEST_COUNTER;
59
60     private static DummyHttpServer initialize() {
61         return DummyHttpServer.start(routes -> routes
62                 .get("/sample-get", (req, resp) -> sendString(resp, OK))
63                 .get("/delay-get", (req, resp) ->
64                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
65                 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
66                 .get("/retry-get-500", (req, resp) ->
67                         sendInOrderWithDelay(REQUEST_COUNTER,
68                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
69                 .get("/retry-get-400", (req, resp) ->
70                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
71                 .get("/retry-get-500-200", (req, resp) ->
72                         sendInOrderWithDelay(REQUEST_COUNTER,
73                                 Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
74                 .get("/retry-get-200", (req, resp) ->
75                         sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
76                 .post("/headers-post", (req, resp) -> resp
77                         .sendString(Mono.just(req.requestHeaders().toString())))
78                 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
79         );
80     }
81
82     @AfterAll
83     static void tearDownClass() {
84         HTTP_SERVER.close();
85     }
86
87     @Test
88     void simpleGet() throws Exception {
89         // given
90         final HttpRequest httpRequest = requestFor("/sample-get")
91                 .method(HttpMethod.GET)
92                 .build();
93         final RxHttpClient cut = RxHttpClientFactory.create();
94
95         // when
96         final Mono<String> bodyAsString = cut.call(httpRequest)
97                 .doOnNext(HttpResponse::throwIfUnsuccessful)
98                 .map(HttpResponse::bodyAsString);
99
100         // then
101         StepVerifier.create(bodyAsString)
102                 .expectNext("OK")
103                 .expectComplete()
104                 .verify(TIMEOUT);
105     }
106
107     @Test
108     void getWithError() throws Exception {
109         // given
110         final HttpRequest httpRequest = requestFor("/sample-get-500")
111                 .method(HttpMethod.GET)
112                 .build();
113         final RxHttpClient cut = RxHttpClientFactory.create();
114
115         // when
116         final Mono<String> bodyAsString = cut.call(httpRequest)
117                 .doOnNext(HttpResponse::throwIfUnsuccessful)
118                 .map(HttpResponse::bodyAsString);
119
120         // then
121         StepVerifier.create(bodyAsString)
122                 .expectError(HttpException.class)
123                 .verify(TIMEOUT);
124     }
125
126     @Test
127     void simplePost() throws Exception {
128         // given
129         final String requestBody = "hello world";
130         final HttpRequest httpRequest = requestFor("/echo-post")
131                 .method(HttpMethod.POST)
132                 .body(RequestBody.fromString(requestBody))
133                 .build();
134         final RxHttpClient cut = RxHttpClientFactory.create();
135
136         // when
137         final Mono<String> bodyAsString = cut.call(httpRequest)
138                 .doOnNext(HttpResponse::throwIfUnsuccessful)
139                 .map(HttpResponse::bodyAsString);
140
141         // then
142         StepVerifier.create(bodyAsString)
143                 .expectNext(requestBody)
144                 .expectComplete()
145                 .verify(TIMEOUT);
146     }
147
148     @Test
149     void testChunkedEncoding() throws Exception {
150         // given
151         final String requestBody = "hello world";
152         final HttpRequest httpRequest = requestFor("/headers-post")
153                 .method(HttpMethod.POST)
154                 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
155                 .build();
156         final RxHttpClient cut = RxHttpClientFactory.create();
157
158         // when
159         final Mono<String> bodyAsString = cut.call(httpRequest)
160                 .doOnNext(HttpResponse::throwIfUnsuccessful)
161                 .map(HttpResponse::bodyAsString);
162
163         // then
164         StepVerifier.create(bodyAsString.map(String::toLowerCase))
165                 .consumeNextWith(responseBody -> {
166                     assertThat(responseBody).contains("transfer-encoding: chunked");
167                     assertThat(responseBody).doesNotContain("content-length");
168                 })
169                 .expectComplete()
170                 .verify(TIMEOUT);
171     }
172
173     @Test
174     void testUnchunkedEncoding() throws Exception {
175         // given
176         final String requestBody = "hello world";
177         final HttpRequest httpRequest = requestFor("/headers-post")
178                 .method(HttpMethod.POST)
179                 .body(RequestBody.fromString(requestBody))
180                 .build();
181         final RxHttpClient cut = RxHttpClientFactory.create();
182
183         // when
184         final Mono<String> bodyAsString = cut.call(httpRequest)
185                 .doOnNext(HttpResponse::throwIfUnsuccessful)
186                 .map(HttpResponse::bodyAsString);
187
188         // then
189         StepVerifier.create(bodyAsString.map(String::toLowerCase))
190                 .consumeNextWith(responseBody -> {
191                     assertThat(responseBody).doesNotContain("transfer-encoding");
192                     assertThat(responseBody).contains("content-length");
193                 })
194                 .expectComplete()
195                 .verify(TIMEOUT);
196     }
197
198     @Test
199     void getWithTimeoutError() throws Exception {
200         // given
201         REQUEST_COUNTER = new AtomicInteger();
202         final HttpRequest httpRequest = requestFor("/delay-get")
203                 .method(HttpMethod.GET)
204                 .timeout(Duration.ofMillis(1))
205                 .build();
206         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
207
208         // when
209         final Mono<HttpResponse> response = cut.call(httpRequest);
210
211         // then
212         StepVerifier.create(response)
213                 .expectError(ReadTimeoutException.class)
214                 .verify(TIMEOUT);
215         assertNoServerResponse();
216     }
217
218     @Test
219     void getWithConnectExceptionWhenClosedServer() throws Exception {
220         // given
221         REQUEST_COUNTER = new AtomicInteger();
222         final HttpRequest httpRequest = requestForClosedServer("/sample-get")
223                 .method(HttpMethod.GET)
224                 .build();
225         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
226                 .retryConfig(defaultRetryConfig()
227                         .customRetryableExceptions(HashSet.of(ConnectException.class))
228                         .build())
229                 .build());
230
231         // when
232         final Mono<HttpResponse> response = cut.call(httpRequest);
233
234         // then
235         StepVerifier.create(response)
236                 .expectError(ConnectException.class)
237                 .verify(TIMEOUT);
238         assertNoServerResponse();
239     }
240
241     @Test
242     void getWithRetryableExceptionWhen500() throws Exception {
243         // given
244         REQUEST_COUNTER = new AtomicInteger();
245         final HttpRequest httpRequest = requestFor("/retry-get-500")
246                 .method(HttpMethod.GET)
247                 .build();
248         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
249                 .retryConfig(defaultRetryConfig()
250                         .retryableHttpResponseCodes(HashSet.of(500))
251                         .build())
252                 .build());
253
254         // when
255         final Mono<HttpResponse> response = cut.call(httpRequest);
256
257         // then
258         StepVerifier.create(response)
259                 .expectError(RetryableException.class)
260                 .verify(TIMEOUT);
261         assertRetry();
262     }
263
264     @Test
265     void getWithRetryWhen500AndThen200() throws Exception {
266         // given
267         REQUEST_COUNTER = new AtomicInteger();
268         final HttpRequest httpRequest = requestFor("/retry-get-500-200")
269                 .method(HttpMethod.GET)
270                 .build();
271         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
272                 .retryConfig(defaultRetryConfig()
273                         .retryableHttpResponseCodes(HashSet.of(500))
274                         .build())
275                 .build());
276
277         // when
278         final Mono<String> bodyAsString = cut.call(httpRequest)
279                 .doOnNext(HttpResponse::throwIfUnsuccessful)
280                 .map(HttpResponse::bodyAsString);
281
282         // then
283         StepVerifier.create(bodyAsString)
284                 .expectNext("OK")
285                 .expectComplete()
286                 .verify(TIMEOUT);
287         assertRetry();
288     }
289
290     @Test
291     void getWithoutRetryWhen200() throws Exception {
292         // given
293         REQUEST_COUNTER = new AtomicInteger();
294         final HttpRequest httpRequest = requestFor("/retry-get-200")
295                 .method(HttpMethod.GET)
296                 .build();
297         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
298                 .retryConfig(defaultRetryConfig()
299                         .retryableHttpResponseCodes(HashSet.of(500))
300                         .build())
301                 .build());
302
303         // when
304         final Mono<String> bodyAsString = cut.call(httpRequest)
305                 .doOnNext(HttpResponse::throwIfUnsuccessful)
306                 .map(HttpResponse::bodyAsString);
307
308         // then
309         StepVerifier.create(bodyAsString)
310                 .expectNext("OK")
311                 .expectComplete()
312                 .verify(TIMEOUT);
313         assertNoRetry();
314     }
315
316     @Test
317     void getWithoutRetryWhen400() throws Exception {
318         // given
319         REQUEST_COUNTER = new AtomicInteger();
320         final HttpRequest httpRequest = requestFor("/retry-get-400")
321                 .method(HttpMethod.GET)
322                 .build();
323         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
324                 .retryConfig(defaultRetryConfig()
325                         .retryableHttpResponseCodes(HashSet.of(500))
326                         .build())
327                 .build());
328
329         // when
330         Mono<HttpResponse> result = cut.call(httpRequest);
331
332         // then
333         StepVerifier.create(result)
334                 .consumeNextWith(this::assert400)
335                 .expectComplete()
336                 .verify(TIMEOUT);
337         assertNoRetry();
338     }
339
340     @Test
341     void simpleGetWithCustomConnectionPool() throws Exception {
342         // given
343         final HttpRequest httpRequest = requestFor("/sample-get")
344                 .method(HttpMethod.GET)
345                 .build();
346         final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
347                 .connectionPool(defaultConnectionPoolConfig())
348                 .build());
349
350         // when
351         final Mono<String> bodyAsString = cut.call(httpRequest)
352                 .doOnNext(HttpResponse::throwIfUnsuccessful)
353                 .map(HttpResponse::bodyAsString);
354
355         // then
356         StepVerifier.create(bodyAsString)
357                 .expectNext("OK")
358                 .expectComplete()
359                 .verify(TIMEOUT);
360     }
361
362     private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
363         return ImmutableHttpRequest.builder()
364                 .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
365     }
366
367     private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
368         return ImmutableHttpRequest.builder()
369                 .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
370     }
371
372     private ImmutableRetryConfig.Builder defaultRetryConfig() {
373         return ImmutableRetryConfig.builder()
374                 .retryCount(RETRY_COUNT)
375                 .retryInterval(RETRY_INTERVAL);
376     }
377
378     private void assertRetry() {
379         assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
380     }
381
382     private void assertNoRetry() {
383         assertThat(REQUEST_COUNTER.get()).isOne();
384     }
385
386     private void assertNoServerResponse() {
387         assertThat(REQUEST_COUNTER.get()).isZero();
388     }
389     
390     private void assert400(HttpResponse httpResponse) {
391         assertThat(httpResponse.statusCode()).isEqualTo(400);
392     }
393
394     private ImmutableConnectionPoolConfig defaultConnectionPoolConfig(){
395         return ImmutableConnectionPoolConfig
396                 .builder()
397                 .connectionPool(1)
398                 .maxIdleTime(Duration.ofSeconds(5))
399                 .maxLifeTime(Duration.ofSeconds(10))
400                 .build();
401     }
402 }