2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2020 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
44 import reactor.core.publisher.Flux;
45 import reactor.core.publisher.Mono;
46 import reactor.test.StepVerifier;
48 import java.nio.charset.StandardCharsets;
49 import java.time.Duration;
51 import static org.assertj.core.api.Assertions.assertThat;
52 import static org.mockito.ArgumentMatchers.any;
53 import static org.mockito.BDDMockito.given;
54 import static org.mockito.Mockito.mock;
55 import static org.mockito.Mockito.times;
56 import static org.mockito.Mockito.verify;
57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
65 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
68 class MessageRouterPublisherImplTest {
69 private static final Duration TIMEOUT = Duration.ofSeconds(5);
70 private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
71 private static final int MAX_BATCH_SIZE = 3;
72 public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
73 private final RxHttpClient httpClient = mock(RxHttpClient.class);
74 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
75 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
76 private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
77 private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
78 private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
81 void puttingElementsShouldYieldNonChunkedHttpRequest() {
83 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
84 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
85 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
88 final Flux<MessageRouterPublishResponse> responses = cut
89 .put(jsonPublishRequest, singleJsonMessageBatch);
90 responses.then().block();
93 verify(httpClient).call(httpRequestArgumentCaptor.capture());
94 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
95 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
96 assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
97 assertThat(httpRequest.body()).isNotNull();
98 assertThat(httpRequest.body().length()).isPositive();
102 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
104 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
105 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
107 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
108 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
111 final Flux<MessageRouterPublishResponse> responses = cut
112 .put(jsonPublishRequest, jsonMessagesMaxBatch);
113 responses.then().block();
116 verify(httpClient).call(httpRequestArgumentCaptor.capture());
117 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
118 final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
120 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
121 .isEqualTo(MAX_BATCH_SIZE);
122 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
127 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
129 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
130 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
132 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
133 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
136 final Flux<MessageRouterPublishResponse> responses = cut
137 .put(plainPublishRequest, plainMessagesMaxBatch);
138 responses.then().block();
141 verify(httpClient).call(httpRequestArgumentCaptor.capture());
142 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
143 final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
144 .map(JsonElement::getAsJsonObject);
147 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
148 .isEqualTo(MAX_BATCH_SIZE);
149 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
153 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
155 final List<String> threePlainMessages = List.of("I", "like", "cookies");
156 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
158 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
159 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
162 final Flux<MessageRouterPublishResponse> responses = cut
163 .put(plainPublishRequest, plainMessagesMaxBatch);
164 responses.then().block();
167 verify(httpClient).call(httpRequestArgumentCaptor.capture());
168 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
169 final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
170 .map(JsonElement::getAsJsonPrimitive);
172 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
173 .isEqualTo(MAX_BATCH_SIZE);
174 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
178 void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
180 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
181 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
182 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
185 final Flux<MessageRouterPublishResponse> responses = cut
186 .put(jsonPublishRequest, singleJsonMessageBatch);
187 responses.then().block();
190 verify(httpClient).call(httpRequestArgumentCaptor.capture());
191 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
192 assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
193 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
197 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
199 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
200 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
202 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
203 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
206 final Flux<MessageRouterPublishResponse> responses = cut
207 .put(jsonPublishRequest, jsonMessagesMaxBatch);
210 verifySingleResponse(parsedThreeMessages, responses);
214 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
216 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
217 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
219 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
220 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
223 final Flux<MessageRouterPublishResponse> responses = cut
224 .put(plainPublishRequest, plainMessagesMaxBatch);
227 verifySingleResponse(parsedThreeMessages, responses);
231 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
233 final List<String> threePlainMessages = List.of("I", "like", "cookies");
234 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
236 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
237 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
240 final Flux<MessageRouterPublishResponse> responses = cut
241 .put(plainPublishRequest, plainMessagesMaxBatch);
244 verifySingleResponse(parsedThreeMessages, responses);
248 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
250 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
251 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
253 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
254 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
256 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
257 threeJsonMessages.appendAll(twoJsonMessages));
258 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
261 final Flux<MessageRouterPublishResponse> responses = cut
262 .put(jsonPublishRequest, doubleJsonMessageBatch);
264 responses.then().block();
266 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
267 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
268 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
270 final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
271 assertThat(firstRequest.size()).describedAs("Http request first batch size")
272 .isEqualTo(MAX_BATCH_SIZE);
273 assertListsContainSameElements(firstRequest, parsedThreeMessages);
275 final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
276 assertThat(secondRequest.size()).describedAs("Http request second batch size")
277 .isEqualTo(MAX_BATCH_SIZE - 1);
278 assertListsContainSameElements(secondRequest, parsedTwoMessages);
282 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
284 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
285 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
287 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
288 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
290 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
291 threeJsonMessages.appendAll(twoJsonMessages));
292 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
295 final Flux<MessageRouterPublishResponse> responses = cut
296 .put(plainPublishRequest, doublePlainMessageBatch);
298 responses.then().block();
300 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
301 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
302 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
304 final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
305 .map(JsonElement::getAsJsonObject);
306 assertThat(firstRequest.size()).describedAs("Http request first batch size")
307 .isEqualTo(MAX_BATCH_SIZE);
308 assertListsContainSameElements(firstRequest, parsedThreeMessages);
310 final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
311 .map(JsonElement::getAsJsonObject);
312 assertThat(secondRequest.size()).describedAs("Http request second batch size")
313 .isEqualTo(MAX_BATCH_SIZE - 1);
314 assertListsContainSameElements(secondRequest, parsedTwoMessages);
318 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
320 final List<String> threePlainMessages = List.of("I", "like", "cookies");
321 final List<String> twoPlainMessages = List.of("and", "pierogi");
323 final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
324 final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
326 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
327 threePlainMessages.appendAll(twoPlainMessages));
328 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
331 final Flux<MessageRouterPublishResponse> responses = cut
332 .put(plainPublishRequest, doublePlainMessageBatch);
334 responses.then().block();
336 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
337 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
338 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
340 final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
341 .map(JsonElement::getAsJsonPrimitive);
342 assertThat(firstRequest.size()).describedAs("Http request first batch size")
343 .isEqualTo(MAX_BATCH_SIZE);
344 assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
346 final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
347 .map(JsonElement::getAsJsonPrimitive);
348 assertThat(secondRequest.size()).describedAs("Http request second batch size")
349 .isEqualTo(MAX_BATCH_SIZE - 1);
350 assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
354 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
356 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
357 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
359 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
360 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
362 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
363 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
366 final Flux<MessageRouterPublishResponse> responses = cut
367 .put(jsonPublishRequest, doubleJsonMessageBatch);
370 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
374 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
376 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
377 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
379 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
380 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
382 final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
383 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
386 final Flux<MessageRouterPublishResponse> responses = cut
387 .put(plainPublishRequest, doubleJsonMessageBatch);
390 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
394 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
396 final List<String> threePlainMessages = List.of("I", "like", "cookies");
397 final List<String> twoPlainMessages = List.of("and", "pierogi");
399 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
400 final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
402 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
403 threePlainMessages.appendAll(twoPlainMessages));
404 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
407 final Flux<MessageRouterPublishResponse> responses = cut
408 .put(plainPublishRequest, doublePlainMessageBatch);
411 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
415 void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
417 final List<String> plainMessage = List.of("I", "like", "cookies");
419 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
420 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
423 final Flux<MessageRouterPublishResponse> responses = cut
424 .put(plainPublishRequest, plainMessagesMaxBatch);
427 StepVerifier.create(responses)
428 .consumeNextWith(this::assertTimeoutError)
434 void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
436 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
437 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
439 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
441 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
442 given(httpClient.call(any(HttpRequest.class)))
443 .willReturn(Mono.just(successHttpResponse))
444 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
446 final Flux<MessageRouterPublishResponse> responses = cut
447 .put(jsonPublishRequest, doubleJsonMessageBatch);
450 StepVerifier.create(responses)
451 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
452 .consumeNextWith(this::assertTimeoutError)
458 void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
460 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
461 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
463 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
465 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
466 given(httpClient.call(any(HttpRequest.class)))
467 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
468 .willReturn(Mono.just(successHttpResponse));
470 final Flux<MessageRouterPublishResponse> responses = cut
471 .put(jsonPublishRequest, doubleJsonMessageBatch);
474 StepVerifier.create(responses)
475 .consumeNextWith(this::assertTimeoutError)
476 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
481 private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
482 return plainTextMessages
483 .map(message -> String.format("{\"message\":\"%s\"}", message));
486 private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
487 return ImmutableHttpResponse.builder()
488 .statusCode(statusCode)
490 .statusReason(statusReason)
491 .rawBody("[]".getBytes())
495 private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
496 final String body = Flux.from(httpRequest.body().contents())
497 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
498 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
499 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
501 assertThat(body).describedAs("request body").isNotBlank();
506 private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
507 return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
510 private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
511 return getAsJsonElements(
513 collectNonEmptyRequestBody(httpRequest)
519 private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
520 List<? extends JsonElement> expectedMessages) {
521 for (int i = 0; i < actualMessages.size(); i++) {
522 assertThat(actualMessages.get(i))
523 .describedAs(String.format("Http request element at position %d", i))
524 .isEqualTo(expectedMessages.get(i));
528 private void assertListsContainSameElements(JsonArray actualMessages,
529 List<? extends JsonElement> expectedMessages) {
530 assertThat(actualMessages.size()).describedAs("Http request batch size")
531 .isEqualTo(expectedMessages.size());
533 for (int i = 0; i < actualMessages.size(); i++) {
534 assertThat(actualMessages.get(i))
535 .describedAs(String.format("Http request element at position %d", i))
536 .isEqualTo(expectedMessages.get(i));
540 private void assertTimeoutError(MessageRouterPublishResponse response) {
541 assertThat(response.failed()).isTrue();
542 assertThat(response.items()).isEmpty();
543 assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
546 private void verifySingleResponse(List<? extends JsonElement> threeMessages,
547 Flux<MessageRouterPublishResponse> responses) {
548 StepVerifier.create(responses)
549 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
554 private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
555 List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
556 StepVerifier.create(responses)
557 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
558 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
563 private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
564 assertThat(response.successful()).describedAs("successful").isTrue();
565 JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
566 assertThat(response.items()).containsExactly(jsonElements);