+ @Test
+ void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertTimeoutError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(successHttpResponse))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
+ .consumeNextWith(this::assertTimeoutError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
+ // given
+ final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+ final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+ final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+ final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
+ .willReturn(Mono.just(successHttpResponse));
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {