2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.List;
32 import org.junit.jupiter.api.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
45 import reactor.core.publisher.Flux;
46 import reactor.core.publisher.Mono;
47 import reactor.test.StepVerifier;
49 import java.net.ConnectException;
50 import java.nio.charset.StandardCharsets;
51 import java.time.Duration;
53 import static org.assertj.core.api.Assertions.assertThat;
54 import static org.mockito.ArgumentMatchers.any;
55 import static org.mockito.BDDMockito.given;
56 import static org.mockito.Mockito.mock;
57 import static org.mockito.Mockito.times;
58 import static org.mockito.Mockito.verify;
59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
67 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
70 class MessageRouterPublisherImplTest {
71 private static final Duration TIMEOUT = Duration.ofSeconds(5);
72 private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
73 private static final int MAX_BATCH_SIZE = 3;
74 private static final String ERROR_MESSAGE = "Something went wrong";
75 private final RxHttpClient httpClient = mock(RxHttpClient.class);
76 private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
77 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
78 httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
79 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
80 private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
81 private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
82 private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
83 private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
86 void puttingElementsShouldYieldNonChunkedHttpRequest() {
88 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
89 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
90 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
93 final Flux<MessageRouterPublishResponse> responses = cut
94 .put(jsonPublishRequest, singleJsonMessageBatch);
95 responses.then().block();
98 verify(httpClient).call(httpRequestArgumentCaptor.capture());
99 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
100 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
101 assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
102 assertThat(httpRequest.body()).isNotNull();
103 assertThat(httpRequest.body().length()).isPositive();
107 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
109 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
110 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
112 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
113 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
116 final Flux<MessageRouterPublishResponse> responses = cut
117 .put(jsonPublishRequest, jsonMessagesMaxBatch);
118 responses.then().block();
121 verify(httpClient).call(httpRequestArgumentCaptor.capture());
122 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
123 final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
125 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
126 .isEqualTo(MAX_BATCH_SIZE);
127 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
132 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
134 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
135 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
137 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
138 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
141 final Flux<MessageRouterPublishResponse> responses = cut
142 .put(plainPublishRequest, plainMessagesMaxBatch);
143 responses.then().block();
146 verify(httpClient).call(httpRequestArgumentCaptor.capture());
147 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
148 final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
149 .map(JsonElement::getAsJsonObject);
152 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
153 .isEqualTo(MAX_BATCH_SIZE);
154 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
158 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
160 final List<String> threePlainMessages = List.of("I", "like", "cookies");
161 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
163 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
164 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
167 final Flux<MessageRouterPublishResponse> responses = cut
168 .put(plainPublishRequest, plainMessagesMaxBatch);
169 responses.then().block();
172 verify(httpClient).call(httpRequestArgumentCaptor.capture());
173 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
174 final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
175 .map(JsonElement::getAsJsonPrimitive);
177 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
178 .isEqualTo(MAX_BATCH_SIZE);
179 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
183 void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
185 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
186 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
187 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
190 final Flux<MessageRouterPublishResponse> responses = cut
191 .put(jsonPublishRequest, singleJsonMessageBatch);
192 responses.then().block();
195 verify(httpClient).call(httpRequestArgumentCaptor.capture());
196 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
197 assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
198 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
202 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
204 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
205 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
207 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
208 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
211 final Flux<MessageRouterPublishResponse> responses = cut
212 .put(jsonPublishRequest, jsonMessagesMaxBatch);
215 verifySingleResponse(parsedThreeMessages, responses);
219 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
221 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
222 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
224 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
225 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
228 final Flux<MessageRouterPublishResponse> responses = cut
229 .put(plainPublishRequest, plainMessagesMaxBatch);
232 verifySingleResponse(parsedThreeMessages, responses);
236 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
238 final List<String> threePlainMessages = List.of("I", "like", "cookies");
239 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
241 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
242 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
245 final Flux<MessageRouterPublishResponse> responses = cut
246 .put(plainPublishRequest, plainMessagesMaxBatch);
249 verifySingleResponse(parsedThreeMessages, responses);
253 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
255 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
256 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
258 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
259 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
261 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
262 threeJsonMessages.appendAll(twoJsonMessages));
263 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
266 final Flux<MessageRouterPublishResponse> responses = cut
267 .put(jsonPublishRequest, doubleJsonMessageBatch);
269 responses.then().block();
271 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
272 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
273 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
275 final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
276 assertThat(firstRequest.size()).describedAs("Http request first batch size")
277 .isEqualTo(MAX_BATCH_SIZE);
278 assertListsContainSameElements(firstRequest, parsedThreeMessages);
280 final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
281 assertThat(secondRequest.size()).describedAs("Http request second batch size")
282 .isEqualTo(MAX_BATCH_SIZE - 1);
283 assertListsContainSameElements(secondRequest, parsedTwoMessages);
287 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
289 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
290 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
292 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
293 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
295 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
296 threeJsonMessages.appendAll(twoJsonMessages));
297 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
300 final Flux<MessageRouterPublishResponse> responses = cut
301 .put(plainPublishRequest, doublePlainMessageBatch);
303 responses.then().block();
305 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
306 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
307 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
309 final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
310 .map(JsonElement::getAsJsonObject);
311 assertThat(firstRequest.size()).describedAs("Http request first batch size")
312 .isEqualTo(MAX_BATCH_SIZE);
313 assertListsContainSameElements(firstRequest, parsedThreeMessages);
315 final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
316 .map(JsonElement::getAsJsonObject);
317 assertThat(secondRequest.size()).describedAs("Http request second batch size")
318 .isEqualTo(MAX_BATCH_SIZE - 1);
319 assertListsContainSameElements(secondRequest, parsedTwoMessages);
323 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
325 final List<String> threePlainMessages = List.of("I", "like", "cookies");
326 final List<String> twoPlainMessages = List.of("and", "pierogi");
328 final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
329 final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
331 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
332 threePlainMessages.appendAll(twoPlainMessages));
333 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
336 final Flux<MessageRouterPublishResponse> responses = cut
337 .put(plainPublishRequest, doublePlainMessageBatch);
339 responses.then().block();
341 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
342 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
343 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
345 final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
346 .map(JsonElement::getAsJsonPrimitive);
347 assertThat(firstRequest.size()).describedAs("Http request first batch size")
348 .isEqualTo(MAX_BATCH_SIZE);
349 assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
351 final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
352 .map(JsonElement::getAsJsonPrimitive);
353 assertThat(secondRequest.size()).describedAs("Http request second batch size")
354 .isEqualTo(MAX_BATCH_SIZE - 1);
355 assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
359 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
361 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
362 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
364 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
365 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
367 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
368 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
371 final Flux<MessageRouterPublishResponse> responses = cut
372 .put(jsonPublishRequest, doubleJsonMessageBatch);
375 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
379 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
381 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
382 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
384 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
385 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
387 final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
388 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
391 final Flux<MessageRouterPublishResponse> responses = cut
392 .put(plainPublishRequest, doubleJsonMessageBatch);
395 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
399 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
401 final List<String> threePlainMessages = List.of("I", "like", "cookies");
402 final List<String> twoPlainMessages = List.of("and", "pierogi");
404 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
405 final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
407 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
408 threePlainMessages.appendAll(twoPlainMessages));
409 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
412 final Flux<MessageRouterPublishResponse> responses = cut
413 .put(plainPublishRequest, doublePlainMessageBatch);
416 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
420 void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
422 final List<String> plainMessage = List.of("I", "like", "cookies");
424 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
425 given(clientErrorReasonPresenter.present(any()))
426 .willReturn(ERROR_MESSAGE);
427 given(httpClient.call(any(HttpRequest.class)))
428 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
431 final Flux<MessageRouterPublishResponse> responses = cut
432 .put(plainPublishRequest, plainMessagesMaxBatch);
435 StepVerifier.create(responses)
436 .consumeNextWith(this::assertFailedResponse)
442 void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
444 final List<String> plainMessage = List.of("I", "like", "cookies");
446 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
447 given(clientErrorReasonPresenter.present(any()))
448 .willReturn(ERROR_MESSAGE);
449 given(httpClient.call(any(HttpRequest.class)))
450 .willReturn(Mono.error(new ConnectException()));
453 final Flux<MessageRouterPublishResponse> responses = cut
454 .put(plainPublishRequest, plainMessagesMaxBatch);
457 StepVerifier.create(responses)
458 .consumeNextWith(this::assertFailedResponse)
464 void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
466 final List<String> plainMessage = List.of("I", "like", "cookies");
468 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
469 given(httpClient.call(any(HttpRequest.class)))
470 .willReturn(Mono.just(retryableHttpResponse));
473 final Flux<MessageRouterPublishResponse> responses = cut
474 .put(plainPublishRequest, plainMessagesMaxBatch);
477 StepVerifier.create(responses)
478 .consumeNextWith(this::assertRetryableFailedResponse)
484 void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
486 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
487 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
489 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
491 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
492 given(clientErrorReasonPresenter.present(any()))
493 .willReturn(ERROR_MESSAGE);
494 given(httpClient.call(any(HttpRequest.class)))
495 .willReturn(Mono.just(successHttpResponse))
496 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
499 final Flux<MessageRouterPublishResponse> responses = cut
500 .put(jsonPublishRequest, doubleJsonMessageBatch);
503 StepVerifier.create(responses)
504 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
505 .consumeNextWith(this::assertFailedResponse)
511 void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
513 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
514 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
516 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
518 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
519 given(clientErrorReasonPresenter.present(any()))
520 .willReturn(ERROR_MESSAGE);
521 given(httpClient.call(any(HttpRequest.class)))
522 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
523 .willReturn(Mono.just(successHttpResponse));
526 final Flux<MessageRouterPublishResponse> responses = cut
527 .put(jsonPublishRequest, doubleJsonMessageBatch);
530 StepVerifier.create(responses)
531 .consumeNextWith(this::assertFailedResponse)
532 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
537 private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
538 return plainTextMessages
539 .map(message -> String.format("{\"message\":\"%s\"}", message));
542 private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
543 return ImmutableHttpResponse.builder()
544 .statusCode(statusCode)
546 .statusReason(statusReason)
547 .rawBody("[]".getBytes())
551 private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
552 final String body = Flux.from(httpRequest.body().contents())
553 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
554 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
555 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
557 assertThat(body).describedAs("request body").isNotBlank();
562 private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
563 return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
566 private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
567 return getAsJsonElements(
569 collectNonEmptyRequestBody(httpRequest)
575 private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
576 List<? extends JsonElement> expectedMessages) {
577 for (int i = 0; i < actualMessages.size(); i++) {
578 assertThat(actualMessages.get(i))
579 .describedAs(String.format("Http request element at position %d", i))
580 .isEqualTo(expectedMessages.get(i));
584 private void assertListsContainSameElements(JsonArray actualMessages,
585 List<? extends JsonElement> expectedMessages) {
586 assertThat(actualMessages.size()).describedAs("Http request batch size")
587 .isEqualTo(expectedMessages.size());
589 for (int i = 0; i < actualMessages.size(); i++) {
590 assertThat(actualMessages.get(i))
591 .describedAs(String.format("Http request element at position %d", i))
592 .isEqualTo(expectedMessages.get(i));
596 private void assertFailedResponse(MessageRouterPublishResponse response) {
597 assertThat(response.failed()).isTrue();
598 assertThat(response.items()).isEmpty();
599 assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
602 private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
603 assertThat(response.failed()).isTrue();
604 assertThat(response.items()).isEmpty();
605 assertThat(response.failReason()).startsWith("500 ERROR");
608 private void verifySingleResponse(List<? extends JsonElement> threeMessages,
609 Flux<MessageRouterPublishResponse> responses) {
610 StepVerifier.create(responses)
611 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
616 private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
617 List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
618 StepVerifier.create(responses)
619 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
620 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
625 private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
626 assertThat(response.successful()).describedAs("successful").isTrue();
627 JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
628 assertThat(response.items()).containsExactly(jsonElements);