6c6ded16d595fb969fe5f744979aaddf1c66e939
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.HashMultimap;
32 import io.vavr.collection.List;
33 import org.junit.jupiter.api.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
40 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
49
50 import java.net.ConnectException;
51 import java.nio.charset.StandardCharsets;
52 import java.time.Duration;
53
54 import static org.assertj.core.api.Assertions.assertThat;
55 import static org.mockito.ArgumentMatchers.any;
56 import static org.mockito.BDDMockito.given;
57 import static org.mockito.Mockito.mock;
58 import static org.mockito.Mockito.times;
59 import static org.mockito.Mockito.verify;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
66
67 /**
68  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
69  * @since April 2019
70  */
71 class MessageRouterPublisherImplTest {
72     private static final Duration TIMEOUT = Duration.ofSeconds(5);
73     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
74     private static final int MAX_BATCH_SIZE = 3;
75     private static final String ERROR_MESSAGE = "Something went wrong";
76     private final RxHttpClient httpClient = mock(RxHttpClient.class);
77     private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
78     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
79             httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
80     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
81     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
82     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
83     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
84     private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
85
86     @Test
87     void puttingElementsShouldYieldNonChunkedHttpRequest() {
88         // given
89         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
90         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
91         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
92
93         // when
94         final Flux<MessageRouterPublishResponse> responses = cut
95                 .put(jsonPublishRequest, singleJsonMessageBatch);
96         responses.then().block();
97
98         // then
99         verify(httpClient).call(httpRequestArgumentCaptor.capture());
100         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
101         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
102         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
103         assertThat(httpRequest.body()).isNotNull();
104         assertThat(httpRequest.body().length()).isPositive();
105     }
106
107     @Test
108     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
109         // given
110         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
111         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
112
113         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
114         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
115
116         // when
117         final Flux<MessageRouterPublishResponse> responses = cut
118                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
119         responses.then().block();
120
121         // then
122         verify(httpClient).call(httpRequestArgumentCaptor.capture());
123         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
124         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
125
126         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
127                 .isEqualTo(MAX_BATCH_SIZE);
128         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
129     }
130
131
132     @Test
133     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
134         // given
135         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
136         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
137
138         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
139         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
140
141         // when
142         final Flux<MessageRouterPublishResponse> responses = cut
143                 .put(plainPublishRequest, plainMessagesMaxBatch);
144         responses.then().block();
145
146         // then
147         verify(httpClient).call(httpRequestArgumentCaptor.capture());
148         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
149         final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
150                 .map(JsonElement::getAsJsonObject);
151
152
153         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
154                 .isEqualTo(MAX_BATCH_SIZE);
155         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
156     }
157
158     @Test
159     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
160         // given
161         final List<String> threePlainMessages = List.of("I", "like", "cookies");
162         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
163
164         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
165         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
166
167         // when
168         final Flux<MessageRouterPublishResponse> responses = cut
169                 .put(plainPublishRequest, plainMessagesMaxBatch);
170         responses.then().block();
171
172         // then
173         verify(httpClient).call(httpRequestArgumentCaptor.capture());
174         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
175         final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
176                 .map(JsonElement::getAsJsonPrimitive);
177
178         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
179                 .isEqualTo(MAX_BATCH_SIZE);
180         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
181     }
182
183     @Test
184     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
185         // given
186         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
187         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
188         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
189
190         // when
191         final Flux<MessageRouterPublishResponse> responses = cut
192                 .put(jsonPublishRequest, singleJsonMessageBatch);
193         responses.then().block();
194
195         // then
196         verify(httpClient).call(httpRequestArgumentCaptor.capture());
197         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
198         assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
199                 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
200     }
201
202     @Test
203     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
204         // given
205         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
206         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
207
208         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
209         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
210
211         // when
212         final Flux<MessageRouterPublishResponse> responses = cut
213                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
214
215         // then
216         verifySingleResponse(parsedThreeMessages, responses);
217     }
218
219     @Test
220     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
221         // given
222         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
223         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
224
225         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
226         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
227
228         // when
229         final Flux<MessageRouterPublishResponse> responses = cut
230                 .put(plainPublishRequest, plainMessagesMaxBatch);
231
232         // then
233         verifySingleResponse(parsedThreeMessages, responses);
234     }
235
236     @Test
237     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
238         // given
239         final List<String> threePlainMessages = List.of("I", "like", "cookies");
240         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
241
242         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
243         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
244
245         // when
246         final Flux<MessageRouterPublishResponse> responses = cut
247                 .put(plainPublishRequest, plainMessagesMaxBatch);
248
249         // then
250         verifySingleResponse(parsedThreeMessages, responses);
251     }
252
253     @Test
254     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
255         // given
256         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
257         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
258
259         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
260         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
261
262         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
263                 threeJsonMessages.appendAll(twoJsonMessages));
264         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
265
266         // when
267         final Flux<MessageRouterPublishResponse> responses = cut
268                 .put(jsonPublishRequest, doubleJsonMessageBatch);
269         // then
270         responses.then().block();
271
272         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
273         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
274         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
275
276         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
277         assertThat(firstRequest.size()).describedAs("Http request first batch size")
278                 .isEqualTo(MAX_BATCH_SIZE);
279         assertListsContainSameElements(firstRequest, parsedThreeMessages);
280
281         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
282         assertThat(secondRequest.size()).describedAs("Http request second batch size")
283                 .isEqualTo(MAX_BATCH_SIZE - 1);
284         assertListsContainSameElements(secondRequest, parsedTwoMessages);
285     }
286
287     @Test
288     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
289         // given
290         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
291         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
292
293         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
294         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
295
296         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
297                 threeJsonMessages.appendAll(twoJsonMessages));
298         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
299
300         // when
301         final Flux<MessageRouterPublishResponse> responses = cut
302                 .put(plainPublishRequest, doublePlainMessageBatch);
303         // then
304         responses.then().block();
305
306         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
307         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
308         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
309
310         final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
311                 .map(JsonElement::getAsJsonObject);
312         assertThat(firstRequest.size()).describedAs("Http request first batch size")
313                 .isEqualTo(MAX_BATCH_SIZE);
314         assertListsContainSameElements(firstRequest, parsedThreeMessages);
315
316         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
317                 .map(JsonElement::getAsJsonObject);
318         assertThat(secondRequest.size()).describedAs("Http request second batch size")
319                 .isEqualTo(MAX_BATCH_SIZE - 1);
320         assertListsContainSameElements(secondRequest, parsedTwoMessages);
321     }
322
323     @Test
324     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
325         // given
326         final List<String> threePlainMessages = List.of("I", "like", "cookies");
327         final List<String> twoPlainMessages = List.of("and", "pierogi");
328
329         final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
330         final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
331
332         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
333                 threePlainMessages.appendAll(twoPlainMessages));
334         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
335
336         // when
337         final Flux<MessageRouterPublishResponse> responses = cut
338                 .put(plainPublishRequest, doublePlainMessageBatch);
339         // then
340         responses.then().block();
341
342         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
343         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
344         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
345
346         final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
347                 .map(JsonElement::getAsJsonPrimitive);
348         assertThat(firstRequest.size()).describedAs("Http request first batch size")
349                 .isEqualTo(MAX_BATCH_SIZE);
350         assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
351
352         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
353                 .map(JsonElement::getAsJsonPrimitive);
354         assertThat(secondRequest.size()).describedAs("Http request second batch size")
355                 .isEqualTo(MAX_BATCH_SIZE - 1);
356         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
357     }
358
359     @Test
360     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
361         // given
362         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
363         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
364
365         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
366         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
367
368         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
369         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
370
371         // when
372         final Flux<MessageRouterPublishResponse> responses = cut
373                 .put(jsonPublishRequest, doubleJsonMessageBatch);
374
375         // then
376         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
377     }
378
379     @Test
380     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
381         // given
382         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
383         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
384
385         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
386         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
387
388         final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
389         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
390
391         // when
392         final Flux<MessageRouterPublishResponse> responses = cut
393                 .put(plainPublishRequest, doubleJsonMessageBatch);
394
395         // then
396         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
397     }
398
399     @Test
400     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
401         // given
402         final List<String> threePlainMessages = List.of("I", "like", "cookies");
403         final List<String> twoPlainMessages = List.of("and", "pierogi");
404
405         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
406         final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
407
408         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
409                 threePlainMessages.appendAll(twoPlainMessages));
410         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
411
412         // when
413         final Flux<MessageRouterPublishResponse> responses = cut
414                 .put(plainPublishRequest, doublePlainMessageBatch);
415
416         // then
417         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
418     }
419
420     @Test
421     void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
422         // given
423         final List<String> plainMessage = List.of("I", "like", "cookies");
424
425         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
426         given(clientErrorReasonPresenter.present(any()))
427                 .willReturn(ERROR_MESSAGE);
428         given(httpClient.call(any(HttpRequest.class)))
429                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
430
431         // when
432         final Flux<MessageRouterPublishResponse> responses = cut
433                 .put(plainPublishRequest, plainMessagesMaxBatch);
434
435         // then
436         StepVerifier.create(responses)
437                 .consumeNextWith(this::assertFailedResponse)
438                 .expectComplete()
439                 .verify(TIMEOUT);
440     }
441
442     @Test
443     void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
444         // given
445         final List<String> plainMessage = List.of("I", "like", "cookies");
446
447         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
448         given(clientErrorReasonPresenter.present(any()))
449                 .willReturn(ERROR_MESSAGE);
450         given(httpClient.call(any(HttpRequest.class)))
451                 .willReturn(Mono.error(new ConnectException()));
452
453         // when
454         final Flux<MessageRouterPublishResponse> responses = cut
455                 .put(plainPublishRequest, plainMessagesMaxBatch);
456
457         // then
458         StepVerifier.create(responses)
459                 .consumeNextWith(this::assertFailedResponse)
460                 .expectComplete()
461                 .verify(TIMEOUT);
462     }
463
464     @Test
465     void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
466         // given
467         final List<String> plainMessage = List.of("I", "like", "cookies");
468
469         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
470         given(httpClient.call(any(HttpRequest.class)))
471                 .willReturn(Mono.just(retryableHttpResponse));
472
473         // when
474         final Flux<MessageRouterPublishResponse> responses = cut
475                 .put(plainPublishRequest, plainMessagesMaxBatch);
476
477         // then
478         StepVerifier.create(responses)
479                 .consumeNextWith(this::assertRetryableFailedResponse)
480                 .expectComplete()
481                 .verify(TIMEOUT);
482     }
483
484     @Test
485     void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
486         // given
487         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
488         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
489
490         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
491
492         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
493         given(clientErrorReasonPresenter.present(any()))
494                 .willReturn(ERROR_MESSAGE);
495         given(httpClient.call(any(HttpRequest.class)))
496                 .willReturn(Mono.just(successHttpResponse))
497                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
498
499         // when
500         final Flux<MessageRouterPublishResponse> responses = cut
501                 .put(jsonPublishRequest, doubleJsonMessageBatch);
502
503         // then
504         StepVerifier.create(responses)
505                 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
506                 .consumeNextWith(this::assertFailedResponse)
507                 .expectComplete()
508                 .verify(TIMEOUT);
509     }
510
511     @Test
512     void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
513         // given
514         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
515         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
516
517         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
518
519         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
520         given(clientErrorReasonPresenter.present(any()))
521                 .willReturn(ERROR_MESSAGE);
522         given(httpClient.call(any(HttpRequest.class)))
523                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
524                 .willReturn(Mono.just(successHttpResponse));
525
526         // when
527         final Flux<MessageRouterPublishResponse> responses = cut
528                 .put(jsonPublishRequest, doubleJsonMessageBatch);
529
530         // then
531         StepVerifier.create(responses)
532                 .consumeNextWith(this::assertFailedResponse)
533                 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
534                 .expectComplete()
535                 .verify(TIMEOUT);
536     }
537
538     private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
539         return plainTextMessages
540                 .map(message -> String.format("{\"message\":\"%s\"}", message));
541     }
542
543     private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
544         return ImmutableHttpResponse.builder()
545                 .statusCode(statusCode)
546                 .url(TOPIC_URL)
547                 .statusReason(statusReason)
548                 .rawBody("[]".getBytes())
549                 .headers(HashMultimap.withSeq().empty())
550                 .build();
551     }
552
553     private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
554         final String body = Flux.from(httpRequest.body().contents())
555                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
556                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
557                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
558                 .block();
559         assertThat(body).describedAs("request body").isNotBlank();
560
561         return body;
562     }
563
564     private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
565         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
566     }
567
568     private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
569         return getAsJsonElements(
570                 List.of(
571                         collectNonEmptyRequestBody(httpRequest)
572                                 .split("\n")
573                 )
574         );
575     }
576
577     private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
578                                                 List<? extends JsonElement> expectedMessages) {
579         for (int i = 0; i < actualMessages.size(); i++) {
580             assertThat(actualMessages.get(i))
581                     .describedAs(String.format("Http request element at position %d", i))
582                     .isEqualTo(expectedMessages.get(i));
583         }
584     }
585
586     private void assertListsContainSameElements(JsonArray actualMessages,
587                                                 List<? extends JsonElement> expectedMessages) {
588         assertThat(actualMessages.size()).describedAs("Http request batch size")
589                 .isEqualTo(expectedMessages.size());
590
591         for (int i = 0; i < actualMessages.size(); i++) {
592             assertThat(actualMessages.get(i))
593                     .describedAs(String.format("Http request element at position %d", i))
594                     .isEqualTo(expectedMessages.get(i));
595         }
596     }
597
598     private void assertFailedResponse(MessageRouterPublishResponse response) {
599         assertThat(response.failed()).isTrue();
600         assertThat(response.items()).isEmpty();
601         assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
602     }
603
604     private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
605         assertThat(response.failed()).isTrue();
606         assertThat(response.items()).isEmpty();
607         assertThat(response.failReason()).startsWith("500 ERROR");
608     }
609
610     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
611                                       Flux<MessageRouterPublishResponse> responses) {
612         StepVerifier.create(responses)
613                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
614                 .expectComplete()
615                 .verify(TIMEOUT);
616     }
617
618     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
619                                       List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
620         StepVerifier.create(responses)
621                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
622                 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
623                 .expectComplete()
624                 .verify(TIMEOUT);
625     }
626
627     private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
628         assertThat(response.successful()).describedAs("successful").isTrue();
629         JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
630         assertThat(response.items()).containsExactly(jsonElements);
631     }
632 }