Support retry in DCAE-SDK DMaaP-Client
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / impl / MessageRouterPublisherImplTest.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
48
49 import java.nio.charset.StandardCharsets;
50 import java.time.Duration;
51
52 import static org.assertj.core.api.Assertions.assertThat;
53 import static org.mockito.ArgumentMatchers.any;
54 import static org.mockito.BDDMockito.given;
55 import static org.mockito.Mockito.mock;
56 import static org.mockito.Mockito.times;
57 import static org.mockito.Mockito.verify;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
64
65 /**
66  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
67  * @since April 2019
68  */
69 class MessageRouterPublisherImplTest {
70     private static final Duration TIMEOUT = Duration.ofSeconds(5);
71     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
72     private static final int MAX_BATCH_SIZE = 3;
73     private static final String ERROR_MESSAGE = "Something went wrong";
74     private final RxHttpClient httpClient = mock(RxHttpClient.class);
75     private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
76     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
77             httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
78     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
79     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
80     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
81     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
82
83     @Test
84     void puttingElementsShouldYieldNonChunkedHttpRequest() {
85         // given
86         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
87         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
88         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
89
90         // when
91         final Flux<MessageRouterPublishResponse> responses = cut
92                 .put(jsonPublishRequest, singleJsonMessageBatch);
93         responses.then().block();
94
95         // then
96         verify(httpClient).call(httpRequestArgumentCaptor.capture());
97         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
98         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
99         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
100         assertThat(httpRequest.body()).isNotNull();
101         assertThat(httpRequest.body().length()).isPositive();
102     }
103
104     @Test
105     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
106         // given
107         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
108         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
109
110         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
111         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
112
113         // when
114         final Flux<MessageRouterPublishResponse> responses = cut
115                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
116         responses.then().block();
117
118         // then
119         verify(httpClient).call(httpRequestArgumentCaptor.capture());
120         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
121         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
122
123         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
124                 .isEqualTo(MAX_BATCH_SIZE);
125         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
126     }
127
128
129     @Test
130     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
131         // given
132         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
133         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
134
135         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
136         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
137
138         // when
139         final Flux<MessageRouterPublishResponse> responses = cut
140                 .put(plainPublishRequest, plainMessagesMaxBatch);
141         responses.then().block();
142
143         // then
144         verify(httpClient).call(httpRequestArgumentCaptor.capture());
145         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
146         final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
147                 .map(JsonElement::getAsJsonObject);
148
149
150         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
151                 .isEqualTo(MAX_BATCH_SIZE);
152         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
153     }
154
155     @Test
156     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
157         // given
158         final List<String> threePlainMessages = List.of("I", "like", "cookies");
159         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
160
161         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
162         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
163
164         // when
165         final Flux<MessageRouterPublishResponse> responses = cut
166                 .put(plainPublishRequest, plainMessagesMaxBatch);
167         responses.then().block();
168
169         // then
170         verify(httpClient).call(httpRequestArgumentCaptor.capture());
171         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
172         final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
173                 .map(JsonElement::getAsJsonPrimitive);
174
175         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
176                 .isEqualTo(MAX_BATCH_SIZE);
177         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
178     }
179
180     @Test
181     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
182         // given
183         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
184         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
185         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
186
187         // when
188         final Flux<MessageRouterPublishResponse> responses = cut
189                 .put(jsonPublishRequest, singleJsonMessageBatch);
190         responses.then().block();
191
192         // then
193         verify(httpClient).call(httpRequestArgumentCaptor.capture());
194         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
195         assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
196                 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
197     }
198
199     @Test
200     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
201         // given
202         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
203         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
204
205         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
206         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
207
208         // when
209         final Flux<MessageRouterPublishResponse> responses = cut
210                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
211
212         // then
213         verifySingleResponse(parsedThreeMessages, responses);
214     }
215
216     @Test
217     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
218         // given
219         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
220         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
221
222         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
223         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
224
225         // when
226         final Flux<MessageRouterPublishResponse> responses = cut
227                 .put(plainPublishRequest, plainMessagesMaxBatch);
228
229         // then
230         verifySingleResponse(parsedThreeMessages, responses);
231     }
232
233     @Test
234     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
235         // given
236         final List<String> threePlainMessages = List.of("I", "like", "cookies");
237         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
238
239         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
240         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
241
242         // when
243         final Flux<MessageRouterPublishResponse> responses = cut
244                 .put(plainPublishRequest, plainMessagesMaxBatch);
245
246         // then
247         verifySingleResponse(parsedThreeMessages, responses);
248     }
249
250     @Test
251     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
252         // given
253         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
254         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
255
256         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
257         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
258
259         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
260                 threeJsonMessages.appendAll(twoJsonMessages));
261         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
262
263         // when
264         final Flux<MessageRouterPublishResponse> responses = cut
265                 .put(jsonPublishRequest, doubleJsonMessageBatch);
266         // then
267         responses.then().block();
268
269         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
270         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
271         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
272
273         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
274         assertThat(firstRequest.size()).describedAs("Http request first batch size")
275                 .isEqualTo(MAX_BATCH_SIZE);
276         assertListsContainSameElements(firstRequest, parsedThreeMessages);
277
278         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
279         assertThat(secondRequest.size()).describedAs("Http request second batch size")
280                 .isEqualTo(MAX_BATCH_SIZE - 1);
281         assertListsContainSameElements(secondRequest, parsedTwoMessages);
282     }
283
284     @Test
285     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
286         // given
287         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
288         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
289
290         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
291         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
292
293         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
294                 threeJsonMessages.appendAll(twoJsonMessages));
295         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
296
297         // when
298         final Flux<MessageRouterPublishResponse> responses = cut
299                 .put(plainPublishRequest, doublePlainMessageBatch);
300         // then
301         responses.then().block();
302
303         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
304         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
305         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
306
307         final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
308                 .map(JsonElement::getAsJsonObject);
309         assertThat(firstRequest.size()).describedAs("Http request first batch size")
310                 .isEqualTo(MAX_BATCH_SIZE);
311         assertListsContainSameElements(firstRequest, parsedThreeMessages);
312
313         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
314                 .map(JsonElement::getAsJsonObject);
315         assertThat(secondRequest.size()).describedAs("Http request second batch size")
316                 .isEqualTo(MAX_BATCH_SIZE - 1);
317         assertListsContainSameElements(secondRequest, parsedTwoMessages);
318     }
319
320     @Test
321     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
322         // given
323         final List<String> threePlainMessages = List.of("I", "like", "cookies");
324         final List<String> twoPlainMessages = List.of("and", "pierogi");
325
326         final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
327         final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
328
329         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
330                 threePlainMessages.appendAll(twoPlainMessages));
331         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
332
333         // when
334         final Flux<MessageRouterPublishResponse> responses = cut
335                 .put(plainPublishRequest, doublePlainMessageBatch);
336         // then
337         responses.then().block();
338
339         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
340         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
341         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
342
343         final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
344                 .map(JsonElement::getAsJsonPrimitive);
345         assertThat(firstRequest.size()).describedAs("Http request first batch size")
346                 .isEqualTo(MAX_BATCH_SIZE);
347         assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
348
349         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
350                 .map(JsonElement::getAsJsonPrimitive);
351         assertThat(secondRequest.size()).describedAs("Http request second batch size")
352                 .isEqualTo(MAX_BATCH_SIZE - 1);
353         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
354     }
355
356     @Test
357     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
358         // given
359         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
360         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
361
362         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
363         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
364
365         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
366         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
367
368         // when
369         final Flux<MessageRouterPublishResponse> responses = cut
370                 .put(jsonPublishRequest, doubleJsonMessageBatch);
371
372         // then
373         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
374     }
375
376     @Test
377     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
378         // given
379         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
380         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
381
382         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
383         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
384
385         final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
386         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
387
388         // when
389         final Flux<MessageRouterPublishResponse> responses = cut
390                 .put(plainPublishRequest, doubleJsonMessageBatch);
391
392         // then
393         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
394     }
395
396     @Test
397     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
398         // given
399         final List<String> threePlainMessages = List.of("I", "like", "cookies");
400         final List<String> twoPlainMessages = List.of("and", "pierogi");
401
402         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
403         final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
404
405         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
406                 threePlainMessages.appendAll(twoPlainMessages));
407         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
408
409         // when
410         final Flux<MessageRouterPublishResponse> responses = cut
411                 .put(plainPublishRequest, doublePlainMessageBatch);
412
413         // then
414         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
415     }
416
417     @Test
418     void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
419         // given
420         final List<String> plainMessage = List.of("I", "like", "cookies");
421
422         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
423         given(clientErrorReasonPresenter.present(any()))
424                 .willReturn(ERROR_MESSAGE);
425         given(httpClient.call(any(HttpRequest.class)))
426                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
427
428         // when
429         final Flux<MessageRouterPublishResponse> responses = cut
430                 .put(plainPublishRequest, plainMessagesMaxBatch);
431
432         // then
433         StepVerifier.create(responses)
434                 .consumeNextWith(this::assertTimeoutError)
435                 .expectComplete()
436                 .verify(TIMEOUT);
437     }
438
439     @Test
440     void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
441         // given
442         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
443         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
444
445         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
446
447         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
448         given(clientErrorReasonPresenter.present(any()))
449                 .willReturn(ERROR_MESSAGE);
450         given(httpClient.call(any(HttpRequest.class)))
451                 .willReturn(Mono.just(successHttpResponse))
452                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
453
454         // when
455         final Flux<MessageRouterPublishResponse> responses = cut
456                 .put(jsonPublishRequest, doubleJsonMessageBatch);
457
458         // then
459         StepVerifier.create(responses)
460                 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
461                 .consumeNextWith(this::assertTimeoutError)
462                 .expectComplete()
463                 .verify(TIMEOUT);
464     }
465
466     @Test
467     void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
468         // given
469         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
470         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
471
472         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
473
474         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
475         given(clientErrorReasonPresenter.present(any()))
476                 .willReturn(ERROR_MESSAGE);
477         given(httpClient.call(any(HttpRequest.class)))
478                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
479                 .willReturn(Mono.just(successHttpResponse));
480
481         // when
482         final Flux<MessageRouterPublishResponse> responses = cut
483                 .put(jsonPublishRequest, doubleJsonMessageBatch);
484
485         // then
486         StepVerifier.create(responses)
487                 .consumeNextWith(this::assertTimeoutError)
488                 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
489                 .expectComplete()
490                 .verify(TIMEOUT);
491     }
492
493     private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
494         return plainTextMessages
495                 .map(message -> String.format("{\"message\":\"%s\"}", message));
496     }
497
498     private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
499         return ImmutableHttpResponse.builder()
500                 .statusCode(statusCode)
501                 .url(TOPIC_URL)
502                 .statusReason(statusReason)
503                 .rawBody("[]".getBytes())
504                 .build();
505     }
506
507     private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
508         final String body = Flux.from(httpRequest.body().contents())
509                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
510                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
511                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
512                 .block();
513         assertThat(body).describedAs("request body").isNotBlank();
514
515         return body;
516     }
517
518     private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
519         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
520     }
521
522     private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
523         return getAsJsonElements(
524                 List.of(
525                         collectNonEmptyRequestBody(httpRequest)
526                                 .split("\n")
527                 )
528         );
529     }
530
531     private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
532                                                 List<? extends JsonElement> expectedMessages) {
533         for (int i = 0; i < actualMessages.size(); i++) {
534             assertThat(actualMessages.get(i))
535                     .describedAs(String.format("Http request element at position %d", i))
536                     .isEqualTo(expectedMessages.get(i));
537         }
538     }
539
540     private void assertListsContainSameElements(JsonArray actualMessages,
541                                                 List<? extends JsonElement> expectedMessages) {
542         assertThat(actualMessages.size()).describedAs("Http request batch size")
543                 .isEqualTo(expectedMessages.size());
544
545         for (int i = 0; i < actualMessages.size(); i++) {
546             assertThat(actualMessages.get(i))
547                     .describedAs(String.format("Http request element at position %d", i))
548                     .isEqualTo(expectedMessages.get(i));
549         }
550     }
551
552     private void assertTimeoutError(MessageRouterPublishResponse response) {
553         assertThat(response.failed()).isTrue();
554         assertThat(response.items()).isEmpty();
555         assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
556     }
557
558     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
559                                       Flux<MessageRouterPublishResponse> responses) {
560         StepVerifier.create(responses)
561                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
562                 .expectComplete()
563                 .verify(TIMEOUT);
564     }
565
566     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
567                                       List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
568         StepVerifier.create(responses)
569                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
570                 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
571                 .expectComplete()
572                 .verify(TIMEOUT);
573     }
574
575     private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
576         assertThat(response.successful()).describedAs("successful").isTrue();
577         JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
578         assertThat(response.items()).containsExactly(jsonElements);
579     }
580 }