2fde441dccc979f74868506b0773a266dc4c44ef
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
48
49 import java.net.ConnectException;
50 import java.nio.charset.StandardCharsets;
51 import java.time.Duration;
52
53 import static org.assertj.core.api.Assertions.assertThat;
54 import static org.mockito.ArgumentMatchers.any;
55 import static org.mockito.BDDMockito.given;
56 import static org.mockito.Mockito.mock;
57 import static org.mockito.Mockito.times;
58 import static org.mockito.Mockito.verify;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
65
66 /**
67  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
68  * @since April 2019
69  */
70 class MessageRouterPublisherImplTest {
71     private static final Duration TIMEOUT = Duration.ofSeconds(5);
72     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
73     private static final int MAX_BATCH_SIZE = 3;
74     private static final String ERROR_MESSAGE = "Something went wrong";
75     private final RxHttpClient httpClient = mock(RxHttpClient.class);
76     private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
77     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
78             httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
79     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
80     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
81     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
82     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
83     private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
84
85     @Test
86     void puttingElementsShouldYieldNonChunkedHttpRequest() {
87         // given
88         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
89         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
90         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
91
92         // when
93         final Flux<MessageRouterPublishResponse> responses = cut
94                 .put(jsonPublishRequest, singleJsonMessageBatch);
95         responses.then().block();
96
97         // then
98         verify(httpClient).call(httpRequestArgumentCaptor.capture());
99         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
100         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
101         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
102         assertThat(httpRequest.body()).isNotNull();
103         assertThat(httpRequest.body().length()).isPositive();
104     }
105
106     @Test
107     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
108         // given
109         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
110         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
111
112         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
113         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
114
115         // when
116         final Flux<MessageRouterPublishResponse> responses = cut
117                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
118         responses.then().block();
119
120         // then
121         verify(httpClient).call(httpRequestArgumentCaptor.capture());
122         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
123         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
124
125         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
126                 .isEqualTo(MAX_BATCH_SIZE);
127         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
128     }
129
130
131     @Test
132     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
133         // given
134         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
135         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
136
137         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
138         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
139
140         // when
141         final Flux<MessageRouterPublishResponse> responses = cut
142                 .put(plainPublishRequest, plainMessagesMaxBatch);
143         responses.then().block();
144
145         // then
146         verify(httpClient).call(httpRequestArgumentCaptor.capture());
147         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
148         final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
149                 .map(JsonElement::getAsJsonObject);
150
151
152         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
153                 .isEqualTo(MAX_BATCH_SIZE);
154         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
155     }
156
157     @Test
158     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
159         // given
160         final List<String> threePlainMessages = List.of("I", "like", "cookies");
161         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
162
163         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
164         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
165
166         // when
167         final Flux<MessageRouterPublishResponse> responses = cut
168                 .put(plainPublishRequest, plainMessagesMaxBatch);
169         responses.then().block();
170
171         // then
172         verify(httpClient).call(httpRequestArgumentCaptor.capture());
173         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
174         final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
175                 .map(JsonElement::getAsJsonPrimitive);
176
177         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
178                 .isEqualTo(MAX_BATCH_SIZE);
179         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
180     }
181
182     @Test
183     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
184         // given
185         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
186         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
187         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
188
189         // when
190         final Flux<MessageRouterPublishResponse> responses = cut
191                 .put(jsonPublishRequest, singleJsonMessageBatch);
192         responses.then().block();
193
194         // then
195         verify(httpClient).call(httpRequestArgumentCaptor.capture());
196         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
197         assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
198                 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
199     }
200
201     @Test
202     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
203         // given
204         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
205         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
206
207         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
208         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
209
210         // when
211         final Flux<MessageRouterPublishResponse> responses = cut
212                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
213
214         // then
215         verifySingleResponse(parsedThreeMessages, responses);
216     }
217
218     @Test
219     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
220         // given
221         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
222         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
223
224         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
225         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
226
227         // when
228         final Flux<MessageRouterPublishResponse> responses = cut
229                 .put(plainPublishRequest, plainMessagesMaxBatch);
230
231         // then
232         verifySingleResponse(parsedThreeMessages, responses);
233     }
234
235     @Test
236     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
237         // given
238         final List<String> threePlainMessages = List.of("I", "like", "cookies");
239         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
240
241         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
242         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
243
244         // when
245         final Flux<MessageRouterPublishResponse> responses = cut
246                 .put(plainPublishRequest, plainMessagesMaxBatch);
247
248         // then
249         verifySingleResponse(parsedThreeMessages, responses);
250     }
251
252     @Test
253     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
254         // given
255         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
256         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
257
258         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
259         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
260
261         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
262                 threeJsonMessages.appendAll(twoJsonMessages));
263         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
264
265         // when
266         final Flux<MessageRouterPublishResponse> responses = cut
267                 .put(jsonPublishRequest, doubleJsonMessageBatch);
268         // then
269         responses.then().block();
270
271         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
272         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
273         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
274
275         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
276         assertThat(firstRequest.size()).describedAs("Http request first batch size")
277                 .isEqualTo(MAX_BATCH_SIZE);
278         assertListsContainSameElements(firstRequest, parsedThreeMessages);
279
280         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
281         assertThat(secondRequest.size()).describedAs("Http request second batch size")
282                 .isEqualTo(MAX_BATCH_SIZE - 1);
283         assertListsContainSameElements(secondRequest, parsedTwoMessages);
284     }
285
286     @Test
287     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
288         // given
289         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
290         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
291
292         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
293         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
294
295         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
296                 threeJsonMessages.appendAll(twoJsonMessages));
297         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
298
299         // when
300         final Flux<MessageRouterPublishResponse> responses = cut
301                 .put(plainPublishRequest, doublePlainMessageBatch);
302         // then
303         responses.then().block();
304
305         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
306         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
307         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
308
309         final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
310                 .map(JsonElement::getAsJsonObject);
311         assertThat(firstRequest.size()).describedAs("Http request first batch size")
312                 .isEqualTo(MAX_BATCH_SIZE);
313         assertListsContainSameElements(firstRequest, parsedThreeMessages);
314
315         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
316                 .map(JsonElement::getAsJsonObject);
317         assertThat(secondRequest.size()).describedAs("Http request second batch size")
318                 .isEqualTo(MAX_BATCH_SIZE - 1);
319         assertListsContainSameElements(secondRequest, parsedTwoMessages);
320     }
321
322     @Test
323     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
324         // given
325         final List<String> threePlainMessages = List.of("I", "like", "cookies");
326         final List<String> twoPlainMessages = List.of("and", "pierogi");
327
328         final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
329         final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
330
331         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
332                 threePlainMessages.appendAll(twoPlainMessages));
333         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
334
335         // when
336         final Flux<MessageRouterPublishResponse> responses = cut
337                 .put(plainPublishRequest, doublePlainMessageBatch);
338         // then
339         responses.then().block();
340
341         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
342         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
343         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
344
345         final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
346                 .map(JsonElement::getAsJsonPrimitive);
347         assertThat(firstRequest.size()).describedAs("Http request first batch size")
348                 .isEqualTo(MAX_BATCH_SIZE);
349         assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
350
351         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
352                 .map(JsonElement::getAsJsonPrimitive);
353         assertThat(secondRequest.size()).describedAs("Http request second batch size")
354                 .isEqualTo(MAX_BATCH_SIZE - 1);
355         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
356     }
357
358     @Test
359     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
360         // given
361         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
362         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
363
364         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
365         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
366
367         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
368         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
369
370         // when
371         final Flux<MessageRouterPublishResponse> responses = cut
372                 .put(jsonPublishRequest, doubleJsonMessageBatch);
373
374         // then
375         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
376     }
377
378     @Test
379     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
380         // given
381         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
382         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
383
384         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
385         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
386
387         final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
388         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
389
390         // when
391         final Flux<MessageRouterPublishResponse> responses = cut
392                 .put(plainPublishRequest, doubleJsonMessageBatch);
393
394         // then
395         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
396     }
397
398     @Test
399     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
400         // given
401         final List<String> threePlainMessages = List.of("I", "like", "cookies");
402         final List<String> twoPlainMessages = List.of("and", "pierogi");
403
404         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
405         final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
406
407         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
408                 threePlainMessages.appendAll(twoPlainMessages));
409         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
410
411         // when
412         final Flux<MessageRouterPublishResponse> responses = cut
413                 .put(plainPublishRequest, doublePlainMessageBatch);
414
415         // then
416         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
417     }
418
419     @Test
420     void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
421         // given
422         final List<String> plainMessage = List.of("I", "like", "cookies");
423
424         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
425         given(clientErrorReasonPresenter.present(any()))
426                 .willReturn(ERROR_MESSAGE);
427         given(httpClient.call(any(HttpRequest.class)))
428                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
429
430         // when
431         final Flux<MessageRouterPublishResponse> responses = cut
432                 .put(plainPublishRequest, plainMessagesMaxBatch);
433
434         // then
435         StepVerifier.create(responses)
436                 .consumeNextWith(this::assertFailedResponse)
437                 .expectComplete()
438                 .verify(TIMEOUT);
439     }
440
441     @Test
442     void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
443         // given
444         final List<String> plainMessage = List.of("I", "like", "cookies");
445
446         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
447         given(clientErrorReasonPresenter.present(any()))
448                 .willReturn(ERROR_MESSAGE);
449         given(httpClient.call(any(HttpRequest.class)))
450                 .willReturn(Mono.error(new ConnectException()));
451
452         // when
453         final Flux<MessageRouterPublishResponse> responses = cut
454                 .put(plainPublishRequest, plainMessagesMaxBatch);
455
456         // then
457         StepVerifier.create(responses)
458                 .consumeNextWith(this::assertFailedResponse)
459                 .expectComplete()
460                 .verify(TIMEOUT);
461     }
462
463     @Test
464     void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
465         // given
466         final List<String> plainMessage = List.of("I", "like", "cookies");
467
468         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
469         given(httpClient.call(any(HttpRequest.class)))
470                 .willReturn(Mono.just(retryableHttpResponse));
471
472         // when
473         final Flux<MessageRouterPublishResponse> responses = cut
474                 .put(plainPublishRequest, plainMessagesMaxBatch);
475
476         // then
477         StepVerifier.create(responses)
478                 .consumeNextWith(this::assertRetryableFailedResponse)
479                 .expectComplete()
480                 .verify(TIMEOUT);
481     }
482
483     @Test
484     void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
485         // given
486         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
487         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
488
489         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
490
491         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
492         given(clientErrorReasonPresenter.present(any()))
493                 .willReturn(ERROR_MESSAGE);
494         given(httpClient.call(any(HttpRequest.class)))
495                 .willReturn(Mono.just(successHttpResponse))
496                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
497
498         // when
499         final Flux<MessageRouterPublishResponse> responses = cut
500                 .put(jsonPublishRequest, doubleJsonMessageBatch);
501
502         // then
503         StepVerifier.create(responses)
504                 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
505                 .consumeNextWith(this::assertFailedResponse)
506                 .expectComplete()
507                 .verify(TIMEOUT);
508     }
509
510     @Test
511     void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
512         // given
513         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
514         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
515
516         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
517
518         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
519         given(clientErrorReasonPresenter.present(any()))
520                 .willReturn(ERROR_MESSAGE);
521         given(httpClient.call(any(HttpRequest.class)))
522                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
523                 .willReturn(Mono.just(successHttpResponse));
524
525         // when
526         final Flux<MessageRouterPublishResponse> responses = cut
527                 .put(jsonPublishRequest, doubleJsonMessageBatch);
528
529         // then
530         StepVerifier.create(responses)
531                 .consumeNextWith(this::assertFailedResponse)
532                 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
533                 .expectComplete()
534                 .verify(TIMEOUT);
535     }
536
537     private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
538         return plainTextMessages
539                 .map(message -> String.format("{\"message\":\"%s\"}", message));
540     }
541
542     private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
543         return ImmutableHttpResponse.builder()
544                 .statusCode(statusCode)
545                 .url(TOPIC_URL)
546                 .statusReason(statusReason)
547                 .rawBody("[]".getBytes())
548                 .build();
549     }
550
551     private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
552         final String body = Flux.from(httpRequest.body().contents())
553                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
554                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
555                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
556                 .block();
557         assertThat(body).describedAs("request body").isNotBlank();
558
559         return body;
560     }
561
562     private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
563         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
564     }
565
566     private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
567         return getAsJsonElements(
568                 List.of(
569                         collectNonEmptyRequestBody(httpRequest)
570                                 .split("\n")
571                 )
572         );
573     }
574
575     private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
576                                                 List<? extends JsonElement> expectedMessages) {
577         for (int i = 0; i < actualMessages.size(); i++) {
578             assertThat(actualMessages.get(i))
579                     .describedAs(String.format("Http request element at position %d", i))
580                     .isEqualTo(expectedMessages.get(i));
581         }
582     }
583
584     private void assertListsContainSameElements(JsonArray actualMessages,
585                                                 List<? extends JsonElement> expectedMessages) {
586         assertThat(actualMessages.size()).describedAs("Http request batch size")
587                 .isEqualTo(expectedMessages.size());
588
589         for (int i = 0; i < actualMessages.size(); i++) {
590             assertThat(actualMessages.get(i))
591                     .describedAs(String.format("Http request element at position %d", i))
592                     .isEqualTo(expectedMessages.get(i));
593         }
594     }
595
596     private void assertFailedResponse(MessageRouterPublishResponse response) {
597         assertThat(response.failed()).isTrue();
598         assertThat(response.items()).isEmpty();
599         assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
600     }
601
602     private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
603         assertThat(response.failed()).isTrue();
604         assertThat(response.items()).isEmpty();
605         assertThat(response.failReason()).startsWith("500 ERROR");
606     }
607
608     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
609                                       Flux<MessageRouterPublishResponse> responses) {
610         StepVerifier.create(responses)
611                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
612                 .expectComplete()
613                 .verify(TIMEOUT);
614     }
615
616     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
617                                       List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
618         StepVerifier.create(responses)
619                 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
620                 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
621                 .expectComplete()
622                 .verify(TIMEOUT);
623     }
624
625     private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
626         assertThat(response.successful()).describedAs("successful").isTrue();
627         JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
628         assertThat(response.items()).containsExactly(jsonElements);
629     }
630 }