f29bfa2759bc851dcfe05efb1427758f4de5613b
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / impl / MessageRouterPublisherImplTest.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2020 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
44 import reactor.core.publisher.Flux;
45 import reactor.core.publisher.Mono;
46 import reactor.test.StepVerifier;
47
48 import java.nio.charset.StandardCharsets;
49 import java.time.Duration;
50
51 import static org.assertj.core.api.Assertions.assertThat;
52 import static org.mockito.ArgumentMatchers.any;
53 import static org.mockito.BDDMockito.given;
54 import static org.mockito.Mockito.mock;
55 import static org.mockito.Mockito.times;
56 import static org.mockito.Mockito.verify;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
63
64 /**
65  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
66  * @since April 2019
67  */
68 class MessageRouterPublisherImplTest {
69     private static final Duration TIMEOUT = Duration.ofSeconds(5);
70     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
71     private static final int MAX_BATCH_SIZE = 3;
72     public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
73     private final RxHttpClient httpClient = mock(RxHttpClient.class);
74     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
75     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
76     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
77     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
78     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
79
80     @Test
81     void puttingElementsShouldYieldNonChunkedHttpRequest() {
82         // given
83         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
84         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
85         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
86
87         // when
88         final Flux<MessageRouterPublishResponse> responses = cut
89                 .put(jsonPublishRequest, singleJsonMessageBatch);
90         responses.then().block();
91
92         // then
93         verify(httpClient).call(httpRequestArgumentCaptor.capture());
94         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
95         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
96         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
97         assertThat(httpRequest.body()).isNotNull();
98         assertThat(httpRequest.body().length()).isPositive();
99     }
100
101     @Test
102     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
103         // given
104         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
105         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
106
107         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
108         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
109
110         // when
111         final Flux<MessageRouterPublishResponse> responses = cut
112                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
113         responses.then().block();
114
115         // then
116         verify(httpClient).call(httpRequestArgumentCaptor.capture());
117         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
118         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
119
120         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
121                 .isEqualTo(MAX_BATCH_SIZE);
122         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
123     }
124
125
126     @Test
127     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
128         // given
129         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
130         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
131
132         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
133         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
134
135         // when
136         final Flux<MessageRouterPublishResponse> responses = cut
137                 .put(plainPublishRequest, plainMessagesMaxBatch);
138         responses.then().block();
139
140         // then
141         verify(httpClient).call(httpRequestArgumentCaptor.capture());
142         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
143         final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
144                 .map(JsonElement::getAsJsonObject);
145
146
147         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
148                 .isEqualTo(MAX_BATCH_SIZE);
149         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
150     }
151
152     @Test
153     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
154         // given
155         final List<String> threePlainMessages = List.of("I", "like", "cookies");
156         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
157
158         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
159         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
160
161         // when
162         final Flux<MessageRouterPublishResponse> responses = cut
163                 .put(plainPublishRequest, plainMessagesMaxBatch);
164         responses.then().block();
165
166         // then
167         verify(httpClient).call(httpRequestArgumentCaptor.capture());
168         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
169         final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
170                 .map(JsonElement::getAsJsonPrimitive);
171
172         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
173                 .isEqualTo(MAX_BATCH_SIZE);
174         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
175     }
176
177     @Test
178     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
179         // given
180         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
181         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
182         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
183
184         // when
185         final Flux<MessageRouterPublishResponse> responses = cut
186                 .put(jsonPublishRequest, singleJsonMessageBatch);
187         responses.then().block();
188
189         // then
190         verify(httpClient).call(httpRequestArgumentCaptor.capture());
191         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
192         assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
193                 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
194     }
195
196     @Test
197     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
198         // given
199         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
200         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
201
202         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
203         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
204
205         // when
206         final Flux<MessageRouterPublishResponse> responses = cut
207                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
208
209         // then
210         verifySingleResponse(parsedThreeMessages, responses);
211     }
212
213     @Test
214     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
215         // given
216         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
217         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
218
219         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
220         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
221
222         // when
223         final Flux<MessageRouterPublishResponse> responses = cut
224                 .put(plainPublishRequest, plainMessagesMaxBatch);
225
226         // then
227         verifySingleResponse(parsedThreeMessages, responses);
228     }
229
230     @Test
231     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
232         // given
233         final List<String> threePlainMessages = List.of("I", "like", "cookies");
234         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
235
236         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
237         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
238
239         // when
240         final Flux<MessageRouterPublishResponse> responses = cut
241                 .put(plainPublishRequest, plainMessagesMaxBatch);
242
243         // then
244         verifySingleResponse(parsedThreeMessages, responses);
245     }
246
247     @Test
248     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
249         // given
250         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
251         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
252
253         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
254         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
255
256         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
257                 threeJsonMessages.appendAll(twoJsonMessages));
258         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
259
260         // when
261         final Flux<MessageRouterPublishResponse> responses = cut
262                 .put(jsonPublishRequest, doubleJsonMessageBatch);
263         // then
264         responses.then().block();
265
266         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
267         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
268         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
269
270         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
271         assertThat(firstRequest.size()).describedAs("Http request first batch size")
272                 .isEqualTo(MAX_BATCH_SIZE);
273         assertListsContainSameElements(firstRequest, parsedThreeMessages);
274
275         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
276         assertThat(secondRequest.size()).describedAs("Http request second batch size")
277                 .isEqualTo(MAX_BATCH_SIZE - 1);
278         assertListsContainSameElements(secondRequest, parsedTwoMessages);
279     }
280
281     @Test
282     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
283         // given
284         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
285         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
286
287         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
288         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
289
290         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
291                 threeJsonMessages.appendAll(twoJsonMessages));
292         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
293
294         // when
295         final Flux<MessageRouterPublishResponse> responses = cut
296                 .put(plainPublishRequest, doublePlainMessageBatch);
297         // then
298         responses.then().block();
299
300         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
301         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
302         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
303
304         final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
305                 .map(JsonElement::getAsJsonObject);
306         assertThat(firstRequest.size()).describedAs("Http request first batch size")
307                 .isEqualTo(MAX_BATCH_SIZE);
308         assertListsContainSameElements(firstRequest, parsedThreeMessages);
309
310         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
311                 .map(JsonElement::getAsJsonObject);
312         assertThat(secondRequest.size()).describedAs("Http request second batch size")
313                 .isEqualTo(MAX_BATCH_SIZE - 1);
314         assertListsContainSameElements(secondRequest, parsedTwoMessages);
315     }
316
317     @Test
318     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
319         // given
320         final List<String> threePlainMessages = List.of("I", "like", "cookies");
321         final List<String> twoPlainMessages = List.of("and", "pierogi");
322
323         final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
324         final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
325
326         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
327                 threePlainMessages.appendAll(twoPlainMessages));
328         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
329
330         // when
331         final Flux<MessageRouterPublishResponse> responses = cut
332                 .put(plainPublishRequest, doublePlainMessageBatch);
333         // then
334         responses.then().block();
335
336         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
337         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
338         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
339
340         final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
341                 .map(JsonElement::getAsJsonPrimitive);
342         assertThat(firstRequest.size()).describedAs("Http request first batch size")
343                 .isEqualTo(MAX_BATCH_SIZE);
344         assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
345
346         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
347                 .map(JsonElement::getAsJsonPrimitive);
348         assertThat(secondRequest.size()).describedAs("Http request second batch size")
349                 .isEqualTo(MAX_BATCH_SIZE - 1);
350         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
351     }
352
353     @Test
354     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
355         // given
356         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
357         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
358
359         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
360         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
361
362         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
363         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
364
365         // when
366         final Flux<MessageRouterPublishResponse> responses = cut
367                 .put(jsonPublishRequest, doubleJsonMessageBatch);
368
369         // then
370         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
371     }
372
373     @Test
374     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
375         // given
376         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
377         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
378
379         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
380         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
381
382         final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
383         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
384
385         // when
386         final Flux<MessageRouterPublishResponse> responses = cut
387                 .put(plainPublishRequest, doubleJsonMessageBatch);
388
389         // then
390         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
391     }
392
393     @Test
394     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
395         // given
396         final List<String> threePlainMessages = List.of("I", "like", "cookies");
397         final List<String> twoPlainMessages = List.of("and", "pierogi");
398
399         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
400         final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
401
402         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
403                 threePlainMessages.appendAll(twoPlainMessages));
404         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
405
406         // when
407         final Flux<MessageRouterPublishResponse> responses = cut
408                 .put(plainPublishRequest, doublePlainMessageBatch);
409
410         // then
411         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
412     }
413
414     @Test
415     void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
416         // given
417         final List<String> plainMessage = List.of("I", "like", "cookies");
418
419         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
420         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
421
422         // when
423         final Flux<MessageRouterPublishResponse> responses = cut
424                 .put(plainPublishRequest, plainMessagesMaxBatch);
425
426         // then
427         StepVerifier.create(responses)
428                 .consumeNextWith(this::assertTimeoutError)
429                 .expectComplete()
430                 .verify(TIMEOUT);
431     }
432
433     @Test
434     void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
435         // given
436         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
437         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
438
439         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
440
441         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
442         given(httpClient.call(any(HttpRequest.class)))
443                 .willReturn(Mono.just(successHttpResponse))
444                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
445         // when
446         final Flux<MessageRouterPublishResponse> responses = cut
447                 .put(jsonPublishRequest, doubleJsonMessageBatch);
448
449         // then
450         StepVerifier.create(responses)
451                 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
452                 .consumeNextWith(this::assertTimeoutError)
453                 .expectComplete()
454                 .verify(TIMEOUT);
455     }
456
457     @Test
458     void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
459         // given
460         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
461         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
462
463         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
464
465         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
466         given(httpClient.call(any(HttpRequest.class)))
467                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
468                 .willReturn(Mono.just(successHttpResponse));
469         // when
470         final Flux<MessageRouterPublishResponse> responses = cut
471                 .put(jsonPublishRequest, doubleJsonMessageBatch);
472
473         // then
474         StepVerifier.create(responses)
475                 .consumeNextWith(this::assertTimeoutError)
476                 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
477                 .expectComplete()
478                 .verify(TIMEOUT);
479     }
480
481     private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
482         return plainTextMessages
483                 .map(message -> String.format("{\"message\":\"%s\"}", message));
484     }
485
486     private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
487         return ImmutableHttpResponse.builder()
488                 .statusCode(statusCode)
489                 .url(TOPIC_URL)
490                 .statusReason(statusReason)
491                 .rawBody("[]".getBytes())
492                 .build();
493     }
494
495     private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
496         final String body = Flux.from(httpRequest.body().contents())
497                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
498                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
499                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
500                 .block();
501         assertThat(body).describedAs("request body").isNotBlank();
502
503         return body;
504     }
505
506     private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
507         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
508     }
509
510     private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
511         return getAsJsonElements(
512                 List.of(
513                         collectNonEmptyRequestBody(httpRequest)
514                                 .split("\n")
515                 )
516         );
517     }
518
519     private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
520                                                 List<? extends JsonElement> expectedMessages) {
521         for (int i = 0; i < actualMessages.size(); i++) {
522             assertThat(actualMessages.get(i))
523                     .describedAs(String.format("Http request element at position %d", i))
524                     .isEqualTo(expectedMessages.get(i));
525         }
526     }
527
528     private void assertListsContainSameElements(JsonArray actualMessages,
529                                                 List<? extends JsonElement> expectedMessages) {
530         assertThat(actualMessages.size()).describedAs("Http request batch size")
531                 .isEqualTo(expectedMessages.size());
532
533         for (int i = 0; i < actualMessages.size(); i++) {
534             assertThat(actualMessages.get(i))
535                     .describedAs(String.format("Http request element at position %d", i))
536                     .isEqualTo(expectedMessages.get(i));
537         }
538     }
539
540     private void assertTimeoutError(MessageRouterPublishResponse response) {
541         assertThat(response.failed()).isTrue();
542         assertThat(response.items()).isEmpty();
543         assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
544     }
545
546     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
547                                       Flux<MessageRouterPublishResponse> responses) {
548         StepVerifier.create(responses)
549                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
550                 .expectComplete()
551                 .verify(TIMEOUT);
552     }
553
554     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
555                                       List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
556         StepVerifier.create(responses)
557                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
558                 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
559                 .expectComplete()
560                 .verify(TIMEOUT);
561     }
562
563     private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
564         assertThat(response.successful()).describedAs("successful").isTrue();
565         JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
566         assertThat(response.items()).containsExactly(jsonElements);
567     }
568 }