38659acdb0c2c43c2a90d17105f90a19e6d67b72
[dcaegen2/services/sdk.git] / rest-services / dmaap-client / src / test / java / org / onap / dcaegen2 / services / sdk / rest / services / dmaap / client / impl / MessageRouterPublisherImplTest.java
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.BDDMockito.given;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.times;
28 import static org.mockito.Mockito.verify;
29 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
30
31 import com.google.gson.JsonArray;
32 import com.google.gson.JsonElement;
33 import com.google.gson.JsonObject;
34 import com.google.gson.Gson;
35 import com.google.gson.JsonPrimitive;
36 import io.netty.buffer.ByteBufAllocator;
37 import io.netty.handler.codec.http.HttpHeaderValues;
38 import io.vavr.collection.List;
39 import java.nio.charset.StandardCharsets;
40 import java.time.Duration;
41 import org.junit.jupiter.api.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
44 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
45 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
46 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
47 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
48 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
49 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
50 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
51 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
52 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
56
57 /**
58  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
59  * @since April 2019
60  */
61 class MessageRouterPublisherImplTest {
62     private static final Duration TIMEOUT = Duration.ofSeconds(5);
63     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
64     private static final int MAX_BATCH_SIZE = 3;
65     private final RxHttpClient httpClient = mock(RxHttpClient.class);
66     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
67     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
68     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
69     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
70     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
71
72     @Test
73     void puttingElementsShouldYieldNonChunkedHttpRequest() {
74         // given
75         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
76         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
77         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
78
79         // when
80         final Flux<MessageRouterPublishResponse> responses = cut
81                 .put(jsonPublishRequest, singleJsonMessageBatch);
82         responses.then().block();
83
84         // then
85         verify(httpClient).call(httpRequestArgumentCaptor.capture());
86         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
87         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
88         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
89         assertThat(httpRequest.body()).isNotNull();
90         assertThat(httpRequest.body().length()).isGreaterThan(0);
91     }
92
93     @Test
94     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
95         // given
96         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
97         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
98
99         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
100         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
101
102         // when
103         final Flux<MessageRouterPublishResponse> responses = cut
104                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
105         responses.then().block();
106
107         // then
108         verify(httpClient).call(httpRequestArgumentCaptor.capture());
109         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
110         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
111
112         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
113                 .isEqualTo(MAX_BATCH_SIZE);
114         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
115     }
116
117
118
119     @Test
120     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
121         // given
122         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
123         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
124
125         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
126         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
127
128         // when
129         final Flux<MessageRouterPublishResponse> responses = cut
130                 .put(plainPublishRequest, plainMessagesMaxBatch);
131         responses.then().block();
132
133         // then
134         verify(httpClient).call(httpRequestArgumentCaptor.capture());
135         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
136         final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
137                 .map(JsonElement::getAsJsonObject);
138
139
140         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
141                 .isEqualTo(MAX_BATCH_SIZE);
142         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
143     }
144
145     @Test
146     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
147         // given
148         final List<String> threePlainMessages = List.of("I", "like", "cookies");
149         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
150
151         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
152         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
153
154         // when
155         final Flux<MessageRouterPublishResponse> responses = cut
156                 .put(plainPublishRequest, plainMessagesMaxBatch);
157         responses.then().block();
158
159         // then
160         verify(httpClient).call(httpRequestArgumentCaptor.capture());
161         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
162         final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
163                 .map(JsonElement::getAsJsonPrimitive);
164
165         assertThat(elementsInRequest.size()).describedAs("Http request batch size")
166                 .isEqualTo(MAX_BATCH_SIZE);
167         assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
168     }
169
170     @Test
171     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
172         // given
173         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
174         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
175         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
176
177         // when
178         final Flux<MessageRouterPublishResponse> responses = cut
179                 .put(jsonPublishRequest, singleJsonMessageBatch);
180         responses.then().block();
181
182         // then
183         verify(httpClient).call(httpRequestArgumentCaptor.capture());
184         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
185         assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
186                 .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
187     }
188
189     @Test
190     void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
191         // given
192         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
193         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
194
195         final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
196         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
197
198         // when
199         final Flux<MessageRouterPublishResponse> responses = cut
200                 .put(jsonPublishRequest, jsonMessagesMaxBatch);
201
202         // then
203         verifySingleResponse(parsedThreeMessages, responses);
204     }
205
206     @Test
207     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
208         // given
209         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
210         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
211
212         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
213         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
214
215         // when
216         final Flux<MessageRouterPublishResponse> responses = cut
217                 .put(plainPublishRequest, plainMessagesMaxBatch);
218
219         // then
220         verifySingleResponse(parsedThreeMessages, responses);
221     }
222
223     @Test
224     void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
225         // given
226         final List<String> threePlainMessages = List.of("I", "like", "cookies");
227         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
228
229         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
230         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
231
232         // when
233         final Flux<MessageRouterPublishResponse> responses = cut
234                 .put(plainPublishRequest, plainMessagesMaxBatch);
235
236         // then
237         verifySingleResponse(parsedThreeMessages, responses);
238     }
239
240     @Test
241     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
242         // given
243         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
244         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
245
246         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
247         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
248
249         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
250                 threeJsonMessages.appendAll(twoJsonMessages));
251         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
252
253         // when
254         final Flux<MessageRouterPublishResponse> responses = cut
255                 .put(jsonPublishRequest, doubleJsonMessageBatch);
256         // then
257         responses.then().block();
258
259         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
260         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
261         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
262
263         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
264         assertThat(firstRequest.size()).describedAs("Http request first batch size")
265                 .isEqualTo(MAX_BATCH_SIZE);
266         assertListsContainSameElements(firstRequest, parsedThreeMessages);
267
268         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
269         assertThat(secondRequest.size()).describedAs("Http request second batch size")
270                 .isEqualTo(MAX_BATCH_SIZE-1);
271         assertListsContainSameElements(secondRequest, parsedTwoMessages);
272     }
273
274     @Test
275     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
276         // given
277         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
278         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
279
280         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
281         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
282
283         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
284                 threeJsonMessages.appendAll(twoJsonMessages));
285         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
286
287         // when
288         final Flux<MessageRouterPublishResponse> responses = cut
289                 .put(plainPublishRequest, doublePlainMessageBatch);
290         // then
291         responses.then().block();
292
293         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
294         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
295         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
296
297         final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
298                 .map(JsonElement::getAsJsonObject);
299         assertThat(firstRequest.size()).describedAs("Http request first batch size")
300                 .isEqualTo(MAX_BATCH_SIZE);
301         assertListsContainSameElements(firstRequest, parsedThreeMessages);
302
303         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
304                 .map(JsonElement::getAsJsonObject);
305         assertThat(secondRequest.size()).describedAs("Http request second batch size")
306                 .isEqualTo(MAX_BATCH_SIZE-1);
307         assertListsContainSameElements(secondRequest, parsedTwoMessages);
308     }
309
310     @Test
311     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
312         // given
313         final List<String> threePlainMessages = List.of("I", "like", "cookies");
314         final List<String> twoPlainMessages = List.of("and", "pierogi");
315
316         final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
317         final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
318
319         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
320                 threePlainMessages.appendAll(twoPlainMessages));
321         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
322
323         // when
324         final Flux<MessageRouterPublishResponse> responses = cut
325                 .put(plainPublishRequest, doublePlainMessageBatch);
326         // then
327         responses.then().block();
328
329         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
330         final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
331         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
332
333         final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
334                 .map(JsonElement::getAsJsonPrimitive);
335         assertThat(firstRequest.size()).describedAs("Http request first batch size")
336                 .isEqualTo(MAX_BATCH_SIZE);
337         assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
338
339         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
340                 .map(JsonElement::getAsJsonPrimitive);
341         assertThat(secondRequest.size()).describedAs("Http request second batch size")
342                 .isEqualTo(MAX_BATCH_SIZE-1);
343         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
344     }
345
346     @Test
347     void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
348         // given
349         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
350         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
351
352         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
353         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
354
355         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
356         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
357
358         // when
359         final Flux<MessageRouterPublishResponse> responses = cut
360                 .put(jsonPublishRequest, doubleJsonMessageBatch);
361
362         // then
363         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
364     }
365
366     @Test
367     void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
368         // given
369         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
370         final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
371
372         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
373         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
374
375         final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
376         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
377
378         // when
379         final Flux<MessageRouterPublishResponse> responses = cut
380                 .put(plainPublishRequest, doubleJsonMessageBatch);
381
382         // then
383         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
384     }
385
386     @Test
387     void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
388         // given
389         final List<String> threePlainMessages = List.of("I", "like", "cookies");
390         final List<String> twoPlainMessages = List.of("and", "pierogi");
391
392         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
393         final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
394
395         final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
396                 threePlainMessages.appendAll(twoPlainMessages));
397         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
398
399         // when
400         final Flux<MessageRouterPublishResponse> responses = cut
401                 .put(plainPublishRequest, doublePlainMessageBatch);
402
403         // then
404         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
405     }
406
407     private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
408         return plainTextMessages
409                 .map(message -> String.format("{\"message\":\"%s\"}", message));
410     }
411
412     private static HttpResponse createHttpResponse(String statusReason, int statusCode){
413         return ImmutableHttpResponse.builder()
414                 .statusCode(statusCode)
415                 .url(TOPIC_URL)
416                 .statusReason(statusReason)
417                 .rawBody("[]".getBytes())
418                 .build();
419     }
420
421     private String collectNonEmptyRequestBody(HttpRequest httpRequest){
422         final String body = Flux.from(httpRequest.body().contents())
423                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
424                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
425                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
426                 .block();
427         assertThat(body).describedAs("request body").isNotBlank();
428
429         return body;
430     }
431
432     private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
433         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
434     }
435
436     private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
437         return getAsJsonElements(
438                 List.of(
439                         collectNonEmptyRequestBody(httpRequest)
440                                 .split("\n")
441                 )
442         );
443     }
444
445     private void assertListsContainSameElements(List<? extends  JsonElement> actualMessages,
446             List<? extends JsonElement> expectedMessages){
447         for (int i = 0; i < actualMessages.size(); i++) {
448             assertThat(actualMessages.get(i))
449                     .describedAs(String.format("Http request element at position %d", i))
450                     .isEqualTo(expectedMessages.get(i));
451         }
452     }
453
454     private void assertListsContainSameElements(JsonArray actualMessages,
455             List<? extends JsonElement> expectedMessages){
456         assertThat(actualMessages.size()).describedAs("Http request batch size")
457                 .isEqualTo(expectedMessages.size());
458
459         for (int i = 0; i < actualMessages.size(); i++) {
460             assertThat(actualMessages.get(i))
461                     .describedAs(String.format("Http request element at position %d", i))
462                     .isEqualTo(expectedMessages.get(i));
463         }
464     }
465
466     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
467             Flux<MessageRouterPublishResponse> responses) {
468         StepVerifier.create(responses)
469                 .consumeNextWith(response -> {
470                     assertThat(response.successful()).describedAs("successful").isTrue();
471                     assertThat(response.items()).containsExactly(
472                             threeMessages.get(0),
473                             threeMessages.get(1),
474                             threeMessages.get(2));
475                 })
476                 .expectComplete()
477                 .verify(TIMEOUT);
478     }
479
480     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
481             List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
482
483         StepVerifier.create(responses)
484                 .consumeNextWith(response -> {
485                     assertThat(response.successful()).describedAs("successful").isTrue();
486                     assertThat(response.items()).containsExactly(
487                             threeMessages.get(0),
488                             threeMessages.get(1),
489                             threeMessages.get(2));
490                 })
491                 .consumeNextWith(response -> {
492                     assertThat(response.successful()).describedAs("successful").isTrue();
493                     assertThat(response.items()).containsExactly(
494                             twoMessages.get(0),
495                             twoMessages.get(1));
496                 })
497                 .expectComplete()
498                 .verify(TIMEOUT);
499     }
500 }