2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
49 import java.nio.charset.StandardCharsets;
50 import java.time.Duration;
52 import static org.assertj.core.api.Assertions.assertThat;
53 import static org.mockito.ArgumentMatchers.any;
54 import static org.mockito.BDDMockito.given;
55 import static org.mockito.Mockito.mock;
56 import static org.mockito.Mockito.times;
57 import static org.mockito.Mockito.verify;
58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
66 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
69 class MessageRouterPublisherImplTest {
70 private static final Duration TIMEOUT = Duration.ofSeconds(5);
71 private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
72 private static final int MAX_BATCH_SIZE = 3;
73 private static final String ERROR_MESSAGE = "Something went wrong";
74 private final RxHttpClient httpClient = mock(RxHttpClient.class);
75 private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
76 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
77 httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
78 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
79 private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
80 private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
81 private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
84 void puttingElementsShouldYieldNonChunkedHttpRequest() {
86 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
87 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
88 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
91 final Flux<MessageRouterPublishResponse> responses = cut
92 .put(jsonPublishRequest, singleJsonMessageBatch);
93 responses.then().block();
96 verify(httpClient).call(httpRequestArgumentCaptor.capture());
97 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
98 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
99 assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
100 assertThat(httpRequest.body()).isNotNull();
101 assertThat(httpRequest.body().length()).isPositive();
105 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
107 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
108 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
110 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
111 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
114 final Flux<MessageRouterPublishResponse> responses = cut
115 .put(jsonPublishRequest, jsonMessagesMaxBatch);
116 responses.then().block();
119 verify(httpClient).call(httpRequestArgumentCaptor.capture());
120 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
121 final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
123 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
124 .isEqualTo(MAX_BATCH_SIZE);
125 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
130 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
132 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
133 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
135 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
136 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
139 final Flux<MessageRouterPublishResponse> responses = cut
140 .put(plainPublishRequest, plainMessagesMaxBatch);
141 responses.then().block();
144 verify(httpClient).call(httpRequestArgumentCaptor.capture());
145 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
146 final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
147 .map(JsonElement::getAsJsonObject);
150 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
151 .isEqualTo(MAX_BATCH_SIZE);
152 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
156 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
158 final List<String> threePlainMessages = List.of("I", "like", "cookies");
159 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
161 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
162 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
165 final Flux<MessageRouterPublishResponse> responses = cut
166 .put(plainPublishRequest, plainMessagesMaxBatch);
167 responses.then().block();
170 verify(httpClient).call(httpRequestArgumentCaptor.capture());
171 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
172 final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
173 .map(JsonElement::getAsJsonPrimitive);
175 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
176 .isEqualTo(MAX_BATCH_SIZE);
177 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
181 void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
183 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
184 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
185 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
188 final Flux<MessageRouterPublishResponse> responses = cut
189 .put(jsonPublishRequest, singleJsonMessageBatch);
190 responses.then().block();
193 verify(httpClient).call(httpRequestArgumentCaptor.capture());
194 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
195 assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
196 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
200 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
202 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
203 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
205 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
206 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
209 final Flux<MessageRouterPublishResponse> responses = cut
210 .put(jsonPublishRequest, jsonMessagesMaxBatch);
213 verifySingleResponse(parsedThreeMessages, responses);
217 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
219 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
220 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
222 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
223 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
226 final Flux<MessageRouterPublishResponse> responses = cut
227 .put(plainPublishRequest, plainMessagesMaxBatch);
230 verifySingleResponse(parsedThreeMessages, responses);
234 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
236 final List<String> threePlainMessages = List.of("I", "like", "cookies");
237 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
239 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
240 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
243 final Flux<MessageRouterPublishResponse> responses = cut
244 .put(plainPublishRequest, plainMessagesMaxBatch);
247 verifySingleResponse(parsedThreeMessages, responses);
251 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
253 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
254 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
256 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
257 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
259 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
260 threeJsonMessages.appendAll(twoJsonMessages));
261 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
264 final Flux<MessageRouterPublishResponse> responses = cut
265 .put(jsonPublishRequest, doubleJsonMessageBatch);
267 responses.then().block();
269 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
270 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
271 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
273 final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
274 assertThat(firstRequest.size()).describedAs("Http request first batch size")
275 .isEqualTo(MAX_BATCH_SIZE);
276 assertListsContainSameElements(firstRequest, parsedThreeMessages);
278 final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
279 assertThat(secondRequest.size()).describedAs("Http request second batch size")
280 .isEqualTo(MAX_BATCH_SIZE - 1);
281 assertListsContainSameElements(secondRequest, parsedTwoMessages);
285 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
287 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
288 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
290 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
291 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
293 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
294 threeJsonMessages.appendAll(twoJsonMessages));
295 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
298 final Flux<MessageRouterPublishResponse> responses = cut
299 .put(plainPublishRequest, doublePlainMessageBatch);
301 responses.then().block();
303 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
304 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
305 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
307 final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
308 .map(JsonElement::getAsJsonObject);
309 assertThat(firstRequest.size()).describedAs("Http request first batch size")
310 .isEqualTo(MAX_BATCH_SIZE);
311 assertListsContainSameElements(firstRequest, parsedThreeMessages);
313 final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
314 .map(JsonElement::getAsJsonObject);
315 assertThat(secondRequest.size()).describedAs("Http request second batch size")
316 .isEqualTo(MAX_BATCH_SIZE - 1);
317 assertListsContainSameElements(secondRequest, parsedTwoMessages);
321 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
323 final List<String> threePlainMessages = List.of("I", "like", "cookies");
324 final List<String> twoPlainMessages = List.of("and", "pierogi");
326 final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
327 final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
329 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
330 threePlainMessages.appendAll(twoPlainMessages));
331 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
334 final Flux<MessageRouterPublishResponse> responses = cut
335 .put(plainPublishRequest, doublePlainMessageBatch);
337 responses.then().block();
339 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
340 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
341 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
343 final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
344 .map(JsonElement::getAsJsonPrimitive);
345 assertThat(firstRequest.size()).describedAs("Http request first batch size")
346 .isEqualTo(MAX_BATCH_SIZE);
347 assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
349 final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
350 .map(JsonElement::getAsJsonPrimitive);
351 assertThat(secondRequest.size()).describedAs("Http request second batch size")
352 .isEqualTo(MAX_BATCH_SIZE - 1);
353 assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
357 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
359 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
360 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
362 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
363 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
365 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
366 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
369 final Flux<MessageRouterPublishResponse> responses = cut
370 .put(jsonPublishRequest, doubleJsonMessageBatch);
373 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
377 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
379 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
380 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
382 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
383 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
385 final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
386 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
389 final Flux<MessageRouterPublishResponse> responses = cut
390 .put(plainPublishRequest, doubleJsonMessageBatch);
393 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
397 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
399 final List<String> threePlainMessages = List.of("I", "like", "cookies");
400 final List<String> twoPlainMessages = List.of("and", "pierogi");
402 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
403 final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
405 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
406 threePlainMessages.appendAll(twoPlainMessages));
407 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
410 final Flux<MessageRouterPublishResponse> responses = cut
411 .put(plainPublishRequest, doublePlainMessageBatch);
414 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
418 void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
420 final List<String> plainMessage = List.of("I", "like", "cookies");
422 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
423 given(clientErrorReasonPresenter.present(any()))
424 .willReturn(ERROR_MESSAGE);
425 given(httpClient.call(any(HttpRequest.class)))
426 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
429 final Flux<MessageRouterPublishResponse> responses = cut
430 .put(plainPublishRequest, plainMessagesMaxBatch);
433 StepVerifier.create(responses)
434 .consumeNextWith(this::assertTimeoutError)
440 void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
442 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
443 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
445 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
447 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
448 given(clientErrorReasonPresenter.present(any()))
449 .willReturn(ERROR_MESSAGE);
450 given(httpClient.call(any(HttpRequest.class)))
451 .willReturn(Mono.just(successHttpResponse))
452 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
455 final Flux<MessageRouterPublishResponse> responses = cut
456 .put(jsonPublishRequest, doubleJsonMessageBatch);
459 StepVerifier.create(responses)
460 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
461 .consumeNextWith(this::assertTimeoutError)
467 void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
469 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
470 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
472 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
474 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
475 given(clientErrorReasonPresenter.present(any()))
476 .willReturn(ERROR_MESSAGE);
477 given(httpClient.call(any(HttpRequest.class)))
478 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
479 .willReturn(Mono.just(successHttpResponse));
482 final Flux<MessageRouterPublishResponse> responses = cut
483 .put(jsonPublishRequest, doubleJsonMessageBatch);
486 StepVerifier.create(responses)
487 .consumeNextWith(this::assertTimeoutError)
488 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
493 private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
494 return plainTextMessages
495 .map(message -> String.format("{\"message\":\"%s\"}", message));
498 private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
499 return ImmutableHttpResponse.builder()
500 .statusCode(statusCode)
502 .statusReason(statusReason)
503 .rawBody("[]".getBytes())
507 private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
508 final String body = Flux.from(httpRequest.body().contents())
509 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
510 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
511 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
513 assertThat(body).describedAs("request body").isNotBlank();
518 private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
519 return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
522 private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
523 return getAsJsonElements(
525 collectNonEmptyRequestBody(httpRequest)
531 private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
532 List<? extends JsonElement> expectedMessages) {
533 for (int i = 0; i < actualMessages.size(); i++) {
534 assertThat(actualMessages.get(i))
535 .describedAs(String.format("Http request element at position %d", i))
536 .isEqualTo(expectedMessages.get(i));
540 private void assertListsContainSameElements(JsonArray actualMessages,
541 List<? extends JsonElement> expectedMessages) {
542 assertThat(actualMessages.size()).describedAs("Http request batch size")
543 .isEqualTo(expectedMessages.size());
545 for (int i = 0; i < actualMessages.size(); i++) {
546 assertThat(actualMessages.get(i))
547 .describedAs(String.format("Http request element at position %d", i))
548 .isEqualTo(expectedMessages.get(i));
552 private void assertTimeoutError(MessageRouterPublishResponse response) {
553 assertThat(response.failed()).isTrue();
554 assertThat(response.items()).isEmpty();
555 assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
558 private void verifySingleResponse(List<? extends JsonElement> threeMessages,
559 Flux<MessageRouterPublishResponse> responses) {
560 StepVerifier.create(responses)
561 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
566 private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
567 List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
568 StepVerifier.create(responses)
569 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
570 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
575 private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
576 assertThat(response.successful()).describedAs("successful").isTrue();
577 JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
578 assertThat(response.items()).containsExactly(jsonElements);