2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019-2021 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.netty.buffer.ByteBufAllocator;
29 import io.netty.handler.codec.http.HttpHeaderValues;
30 import io.netty.handler.timeout.ReadTimeoutException;
31 import io.vavr.collection.HashMultimap;
32 import io.vavr.collection.List;
33 import org.junit.jupiter.api.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
39 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
40 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
46 import reactor.core.publisher.Flux;
47 import reactor.core.publisher.Mono;
48 import reactor.test.StepVerifier;
50 import java.net.ConnectException;
51 import java.nio.charset.StandardCharsets;
52 import java.time.Duration;
54 import static org.assertj.core.api.Assertions.assertThat;
55 import static org.mockito.ArgumentMatchers.any;
56 import static org.mockito.BDDMockito.given;
57 import static org.mockito.Mockito.mock;
58 import static org.mockito.Mockito.times;
59 import static org.mockito.Mockito.verify;
60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
68 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
71 class MessageRouterPublisherImplTest {
72 private static final Duration TIMEOUT = Duration.ofSeconds(5);
73 private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
74 private static final int MAX_BATCH_SIZE = 3;
75 private static final String ERROR_MESSAGE = "Something went wrong";
76 private final RxHttpClient httpClient = mock(RxHttpClient.class);
77 private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
78 private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
79 httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
80 private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
81 private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
82 private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
83 private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
84 private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
87 void puttingElementsShouldYieldNonChunkedHttpRequest() {
89 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
90 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
91 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
94 final Flux<MessageRouterPublishResponse> responses = cut
95 .put(jsonPublishRequest, singleJsonMessageBatch);
96 responses.then().block();
99 verify(httpClient).call(httpRequestArgumentCaptor.capture());
100 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
101 assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
102 assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
103 assertThat(httpRequest.body()).isNotNull();
104 assertThat(httpRequest.body().length()).isPositive();
108 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
110 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
111 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
113 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
114 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
117 final Flux<MessageRouterPublishResponse> responses = cut
118 .put(jsonPublishRequest, jsonMessagesMaxBatch);
119 responses.then().block();
122 verify(httpClient).call(httpRequestArgumentCaptor.capture());
123 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
124 final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
126 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
127 .isEqualTo(MAX_BATCH_SIZE);
128 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
133 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
135 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
136 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
138 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
139 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
142 final Flux<MessageRouterPublishResponse> responses = cut
143 .put(plainPublishRequest, plainMessagesMaxBatch);
144 responses.then().block();
147 verify(httpClient).call(httpRequestArgumentCaptor.capture());
148 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
149 final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
150 .map(JsonElement::getAsJsonObject);
153 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
154 .isEqualTo(MAX_BATCH_SIZE);
155 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
159 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
161 final List<String> threePlainMessages = List.of("I", "like", "cookies");
162 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
164 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
165 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
168 final Flux<MessageRouterPublishResponse> responses = cut
169 .put(plainPublishRequest, plainMessagesMaxBatch);
170 responses.then().block();
173 verify(httpClient).call(httpRequestArgumentCaptor.capture());
174 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
175 final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
176 .map(JsonElement::getAsJsonPrimitive);
178 assertThat(elementsInRequest.size()).describedAs("Http request batch size")
179 .isEqualTo(MAX_BATCH_SIZE);
180 assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
184 void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
186 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
187 final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
188 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
191 final Flux<MessageRouterPublishResponse> responses = cut
192 .put(jsonPublishRequest, singleJsonMessageBatch);
193 responses.then().block();
196 verify(httpClient).call(httpRequestArgumentCaptor.capture());
197 final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
198 assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
199 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
203 void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
205 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
206 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
208 final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
209 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
212 final Flux<MessageRouterPublishResponse> responses = cut
213 .put(jsonPublishRequest, jsonMessagesMaxBatch);
216 verifySingleResponse(parsedThreeMessages, responses);
220 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
222 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
223 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
225 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
226 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
229 final Flux<MessageRouterPublishResponse> responses = cut
230 .put(plainPublishRequest, plainMessagesMaxBatch);
233 verifySingleResponse(parsedThreeMessages, responses);
237 void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
239 final List<String> threePlainMessages = List.of("I", "like", "cookies");
240 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
242 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
243 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
246 final Flux<MessageRouterPublishResponse> responses = cut
247 .put(plainPublishRequest, plainMessagesMaxBatch);
250 verifySingleResponse(parsedThreeMessages, responses);
254 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
256 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
257 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
259 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
260 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
262 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
263 threeJsonMessages.appendAll(twoJsonMessages));
264 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
267 final Flux<MessageRouterPublishResponse> responses = cut
268 .put(jsonPublishRequest, doubleJsonMessageBatch);
270 responses.then().block();
272 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
273 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
274 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
276 final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
277 assertThat(firstRequest.size()).describedAs("Http request first batch size")
278 .isEqualTo(MAX_BATCH_SIZE);
279 assertListsContainSameElements(firstRequest, parsedThreeMessages);
281 final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
282 assertThat(secondRequest.size()).describedAs("Http request second batch size")
283 .isEqualTo(MAX_BATCH_SIZE - 1);
284 assertListsContainSameElements(secondRequest, parsedTwoMessages);
288 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
290 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
291 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
293 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
294 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
296 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
297 threeJsonMessages.appendAll(twoJsonMessages));
298 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
301 final Flux<MessageRouterPublishResponse> responses = cut
302 .put(plainPublishRequest, doublePlainMessageBatch);
304 responses.then().block();
306 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
307 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
308 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
310 final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
311 .map(JsonElement::getAsJsonObject);
312 assertThat(firstRequest.size()).describedAs("Http request first batch size")
313 .isEqualTo(MAX_BATCH_SIZE);
314 assertListsContainSameElements(firstRequest, parsedThreeMessages);
316 final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
317 .map(JsonElement::getAsJsonObject);
318 assertThat(secondRequest.size()).describedAs("Http request second batch size")
319 .isEqualTo(MAX_BATCH_SIZE - 1);
320 assertListsContainSameElements(secondRequest, parsedTwoMessages);
324 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
326 final List<String> threePlainMessages = List.of("I", "like", "cookies");
327 final List<String> twoPlainMessages = List.of("and", "pierogi");
329 final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
330 final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
332 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
333 threePlainMessages.appendAll(twoPlainMessages));
334 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
337 final Flux<MessageRouterPublishResponse> responses = cut
338 .put(plainPublishRequest, doublePlainMessageBatch);
340 responses.then().block();
342 verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
343 final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
344 assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
346 final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
347 .map(JsonElement::getAsJsonPrimitive);
348 assertThat(firstRequest.size()).describedAs("Http request first batch size")
349 .isEqualTo(MAX_BATCH_SIZE);
350 assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
352 final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
353 .map(JsonElement::getAsJsonPrimitive);
354 assertThat(secondRequest.size()).describedAs("Http request second batch size")
355 .isEqualTo(MAX_BATCH_SIZE - 1);
356 assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
360 void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
362 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
363 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
365 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
366 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
368 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
369 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
372 final Flux<MessageRouterPublishResponse> responses = cut
373 .put(jsonPublishRequest, doubleJsonMessageBatch);
376 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
380 void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
382 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
383 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
385 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
386 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
388 final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
389 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
392 final Flux<MessageRouterPublishResponse> responses = cut
393 .put(plainPublishRequest, doubleJsonMessageBatch);
396 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
400 void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
402 final List<String> threePlainMessages = List.of("I", "like", "cookies");
403 final List<String> twoPlainMessages = List.of("and", "pierogi");
405 final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
406 final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
408 final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
409 threePlainMessages.appendAll(twoPlainMessages));
410 given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
413 final Flux<MessageRouterPublishResponse> responses = cut
414 .put(plainPublishRequest, doublePlainMessageBatch);
417 verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
421 void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
423 final List<String> plainMessage = List.of("I", "like", "cookies");
425 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
426 given(clientErrorReasonPresenter.present(any()))
427 .willReturn(ERROR_MESSAGE);
428 given(httpClient.call(any(HttpRequest.class)))
429 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
432 final Flux<MessageRouterPublishResponse> responses = cut
433 .put(plainPublishRequest, plainMessagesMaxBatch);
436 StepVerifier.create(responses)
437 .consumeNextWith(this::assertFailedResponse)
443 void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
445 final List<String> plainMessage = List.of("I", "like", "cookies");
447 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
448 given(clientErrorReasonPresenter.present(any()))
449 .willReturn(ERROR_MESSAGE);
450 given(httpClient.call(any(HttpRequest.class)))
451 .willReturn(Mono.error(new ConnectException()));
454 final Flux<MessageRouterPublishResponse> responses = cut
455 .put(plainPublishRequest, plainMessagesMaxBatch);
458 StepVerifier.create(responses)
459 .consumeNextWith(this::assertFailedResponse)
465 void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
467 final List<String> plainMessage = List.of("I", "like", "cookies");
469 final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
470 given(httpClient.call(any(HttpRequest.class)))
471 .willReturn(Mono.just(retryableHttpResponse));
474 final Flux<MessageRouterPublishResponse> responses = cut
475 .put(plainPublishRequest, plainMessagesMaxBatch);
478 StepVerifier.create(responses)
479 .consumeNextWith(this::assertRetryableFailedResponse)
485 void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
487 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
488 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
490 final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
492 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
493 given(clientErrorReasonPresenter.present(any()))
494 .willReturn(ERROR_MESSAGE);
495 given(httpClient.call(any(HttpRequest.class)))
496 .willReturn(Mono.just(successHttpResponse))
497 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
500 final Flux<MessageRouterPublishResponse> responses = cut
501 .put(jsonPublishRequest, doubleJsonMessageBatch);
504 StepVerifier.create(responses)
505 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
506 .consumeNextWith(this::assertFailedResponse)
512 void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
514 final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
515 final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
517 final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
519 final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
520 given(clientErrorReasonPresenter.present(any()))
521 .willReturn(ERROR_MESSAGE);
522 given(httpClient.call(any(HttpRequest.class)))
523 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
524 .willReturn(Mono.just(successHttpResponse));
527 final Flux<MessageRouterPublishResponse> responses = cut
528 .put(jsonPublishRequest, doubleJsonMessageBatch);
531 StepVerifier.create(responses)
532 .consumeNextWith(this::assertFailedResponse)
533 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
538 private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
539 return plainTextMessages
540 .map(message -> String.format("{\"message\":\"%s\"}", message));
543 private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
544 return ImmutableHttpResponse.builder()
545 .statusCode(statusCode)
547 .statusReason(statusReason)
548 .rawBody("[]".getBytes())
549 .headers(HashMultimap.withSeq().empty())
553 private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
554 final String body = Flux.from(httpRequest.body().contents())
555 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
556 (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
557 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
559 assertThat(body).describedAs("request body").isNotBlank();
564 private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
565 return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
568 private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
569 return getAsJsonElements(
571 collectNonEmptyRequestBody(httpRequest)
577 private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
578 List<? extends JsonElement> expectedMessages) {
579 for (int i = 0; i < actualMessages.size(); i++) {
580 assertThat(actualMessages.get(i))
581 .describedAs(String.format("Http request element at position %d", i))
582 .isEqualTo(expectedMessages.get(i));
586 private void assertListsContainSameElements(JsonArray actualMessages,
587 List<? extends JsonElement> expectedMessages) {
588 assertThat(actualMessages.size()).describedAs("Http request batch size")
589 .isEqualTo(expectedMessages.size());
591 for (int i = 0; i < actualMessages.size(); i++) {
592 assertThat(actualMessages.get(i))
593 .describedAs(String.format("Http request element at position %d", i))
594 .isEqualTo(expectedMessages.get(i));
598 private void assertFailedResponse(MessageRouterPublishResponse response) {
599 assertThat(response.failed()).isTrue();
600 assertThat(response.items()).isEmpty();
601 assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
604 private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
605 assertThat(response.failed()).isTrue();
606 assertThat(response.items()).isEmpty();
607 assertThat(response.failReason()).startsWith("500 ERROR");
610 private void verifySingleResponse(List<? extends JsonElement> threeMessages,
611 Flux<MessageRouterPublishResponse> responses) {
612 StepVerifier.create(responses)
613 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
618 private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
619 List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
620 StepVerifier.create(responses)
621 .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
622 .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
627 private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
628 assertThat(response.successful()).describedAs("successful").isTrue();
629 JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
630 assertThat(response.items()).containsExactly(jsonElements);