Write sample integration tests that use testcontainers 13/88613/8
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Tue, 28 May 2019 06:07:06 +0000 (08:07 +0200)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Thu, 6 Jun 2019 08:31:17 +0000 (10:31 +0200)
Also refctor MRPublisher unit tests to pass json messages in correct format

Change-Id: Ia8246ded00e11d8ce90c2d84091249134548be01
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1536

rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberCIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java

index 5b11350..1f0fdaf 100644 (file)
@@ -22,15 +22,16 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.JsonPrimitive;
 import java.io.File;
 import java.net.URL;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
@@ -50,15 +51,12 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-@Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ")
 @Testcontainers
 class MessageRouterSubscriberCIT {
     private static final Gson gson = new Gson();
+    private static final JsonParser parser = new JsonParser();
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
-    private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza");
-    private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems)
-            .map(JsonPrimitive::new);
     private static final String CONSUMER_GROUP = "group1";
     private static final String CONSUMER_ID = "consumer200";
     private static final String DMAAP_SERVICE_NAME = "dmaap";
@@ -82,7 +80,7 @@ class MessageRouterSubscriberCIT {
 
     private MessageRouterPublisher publisher = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-    private MessageRouterSubscriber sut = DmaapClientFactory
+    private MessageRouterSubscriber subscriber = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
 
@@ -105,7 +103,7 @@ class MessageRouterSubscriberCIT {
                 .build();
 
         //when
-        Mono<MessageRouterSubscribeResponse> response = sut
+        Mono<MessageRouterSubscribeResponse> response = subscriber
                 .get(mrSubscribeRequest);
 
         //then
@@ -116,12 +114,15 @@ class MessageRouterSubscriberCIT {
     }
 
     @Test
-    void subscriber_shouldGetCorrectResponse() {
+    void subscriberShouldHandleSingleItemResponse(){
         //given
         final String topic = "TOPIC";
-        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain");
+        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
-        final JsonArray expectedItems = getAsJsonArray(messageBatchItems);
+
+        final List<String> singleJsonMessage = Arrays.asList("{\"message\":\"message1\"}");
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final JsonArray expectedItems = getAsJsonArray(singleJsonMessage);
         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
                 .builder()
                 .items(expectedItems)
@@ -130,8 +131,8 @@ class MessageRouterSubscriberCIT {
         //when
         registerTopic(publishRequest, subscribeRequest);
         Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, messageBatch)
-                .then(sut.get(subscribeRequest));
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
 
         //then
         StepVerifier.create(response)
@@ -140,6 +141,84 @@ class MessageRouterSubscriberCIT {
                 .verify();
     }
 
+    @Test
+    void subscriber_shouldHandleMultipleItemsResponse() {
+        //given
+        final String topic = "TOPIC2";
+        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+        final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+        final JsonArray expectedItems = getAsJsonArray(twoJsonMessages);
+        final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .items(expectedItems)
+                .build();
+
+        //when
+        registerTopic(publishRequest, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
+
+    @Test
+    void subscriber_shouldExtractItemsFromResponse() {
+        //given
+        final String topic = "TOPIC3";
+        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+        final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+        //when
+        registerTopic(publishRequest, subscribeRequest);
+        final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
+                .thenMany(subscriber.getElements(subscribeRequest).map(JsonElement::getAsString));
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(twoJsonMessages.get(0))
+                .expectNext(twoJsonMessages.get(1))
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void subscriber_shouldSubscribeToTopic(){
+        //given
+        final String topic = "TOPIC4";
+        final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
+
+        final List<String> twoJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
+
+        //when
+        registerTopic(publishRequest, subscribeRequest);
+        final Flux<String> result = publisher.put(publishRequest, jsonMessageBatch)
+                .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1))
+                        .map(JsonElement::getAsString));
+
+        //then
+        StepVerifier.create(result.take(2))
+                .expectNext(twoJsonMessages.get(0))
+                .expectNext(twoJsonMessages.get(1))
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
     private static String getDockerComposeFilePath(String resourceName){
         URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
                 .getResource(resourceName);
@@ -149,8 +228,7 @@ class MessageRouterSubscriberCIT {
                 .format("File %s does not exist", resourceName));
     }
 
-    private static MessageRouterPublishRequest createMRPublishRequest(String topic,
-            String contentType) {
+    private static MessageRouterPublishRequest createMRPublishRequest(String topic){
         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
                 .name("the topic")
                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
@@ -158,7 +236,6 @@ class MessageRouterSubscriberCIT {
 
         return ImmutableMessageRouterPublishRequest.builder()
                 .sinkDefinition(sinkDefinition)
-                .contentType(contentType)
                 .build();
     }
 
@@ -178,14 +255,21 @@ class MessageRouterSubscriberCIT {
 
     private void registerTopic(MessageRouterPublishRequest publishRequest,
             MessageRouterSubscribeRequest subscribeRequest) {
-        Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new);
+        final List<String> sampleJsonMessages = Arrays.asList("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonObject> jsonMessageBatch = Flux.fromIterable(sampleJsonMessages)
+                .map(parser::parse).map(JsonElement::getAsJsonObject);
 
-        publisher.put(publishRequest, sampleMessage).blockLast();
-        sut.get(subscribeRequest).block();
+        publisher.put(publishRequest, jsonMessageBatch).blockLast();
+        subscriber.get(subscribeRequest).block();
     }
 
     private JsonArray getAsJsonArray(List<String> list) {
         String listsJsonString = gson.toJson(list);
         return new JsonParser().parse(listsJsonString).getAsJsonArray();
     }
+
+    private static Flux<JsonObject> jsonBatch(List<String> messages){
+        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+    }
 }
index f0138d2..f44e86a 100644 (file)
@@ -27,21 +27,24 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import com.google.gson.Gson;
 import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import io.netty.buffer.ByteBuf;
+import com.google.gson.JsonParser;
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
+import io.netty.handler.codec.http.HttpHeaderValues;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
-import org.mockito.verification.VerificationMode;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
@@ -60,34 +63,29 @@ import reactor.test.StepVerifier;
  * @since April 2019
  */
 class MessageRouterPublisherImplTest {
-
     private static final Duration TIMEOUT = Duration.ofSeconds(5);
-    private final RxHttpClient httpClient = mock(RxHttpClient.class);
-    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
-
-    private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
-    private final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+    private static final JsonParser parser = new JsonParser();
+    private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
             .name("the topic")
             .topicUrl("https://dmaap-mr/TOPIC")
             .build();
-    private final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder()
-            .sinkDefinition(sinkDefinition)
-            .build();
-    private final HttpResponse httpResponse = ImmutableHttpResponse.builder()
-            .statusCode(200)
-            .statusReason("OK")
-            .url(sinkDefinition.topicUrl())
-            .rawBody("[]".getBytes())
-            .build();
+    private final RxHttpClient httpClient = mock(RxHttpClient.class);
+    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
+    private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
+    private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest();
+    private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest();
+    private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
 
     @Test
     void puttingElementsShouldYieldNonChunkedHttpRequest() {
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+                .put(mrRequestTextPlain, singleJsonMessageBatch);
         responses.then().block();
 
         // then
@@ -102,55 +100,81 @@ class MessageRouterPublisherImplTest {
     @Test
     void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+                .put(mrRequestJson, singleJsonMessageBatch);
         responses.then().block();
 
         // then
         verify(httpClient).call(httpRequestArgumentCaptor.capture());
         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
-        final JsonArray elementsInRequest = extractNonEmptyRequestBody(httpRequest);
+        final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
         assertThat(elementsInRequest.size()).isEqualTo(3);
-        assertThat(elementsInRequest.get(0).getAsString()).isEqualTo("I");
-        assertThat(elementsInRequest.get(1).getAsString()).isEqualTo("like");
-        assertThat(elementsInRequest.get(2).getAsString()).isEqualTo("cookies");
+        assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
+        assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
+        assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+    }
+
+    @Test
+    void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(mrRequestJson, singleJsonMessageBatch);
+        responses.then().block();
+
+        // then
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        assertThat(httpRequest.headers().getOrElse(HttpHeaders.CONTENT_TYPE, ""))
+                .isEqualTo(HttpHeaderValues.APPLICATION_JSON.toString());
     }
 
     @Test
     void puttingLowNumberOfElementsShouldReturnSingleResponse() {
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequest, Flux.just("I", "like", "cookies").map(JsonPrimitive::new));
+                .put(mrRequestJson, singleJsonMessageBatch);
 
         // then
         StepVerifier.create(responses)
                 .consumeNextWith(response -> {
                     assertThat(response.successful()).describedAs("successful").isTrue();
                     assertThat(response.items()).containsExactly(
-                            new JsonPrimitive("I"),
-                            new JsonPrimitive("like"),
-                            new JsonPrimitive("cookies"));
+                            getAsJsonObject(threeJsonMessages.get(0)),
+                            getAsJsonObject(threeJsonMessages.get(1)),
+                            getAsJsonObject(threeJsonMessages.get(2)));
                 })
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
-
     @Test
     void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
+                threeJsonMessages, twoJsonMessages));
+
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
-
+                .put(mrRequestJson, doubleJsonMessageBatch);
         // then
         responses.then().block();
 
@@ -158,53 +182,97 @@ class MessageRouterPublisherImplTest {
         final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
 
-        final JsonArray firstRequest = extractNonEmptyRequestBody(httpRequests.get(0));
+        final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
         assertThat(firstRequest.size()).isEqualTo(3);
-        assertThat(firstRequest.get(0).getAsString()).isEqualTo("I");
-        assertThat(firstRequest.get(1).getAsString()).isEqualTo("like");
-        assertThat(firstRequest.get(2).getAsString()).isEqualTo("cookies");
+        assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
+        assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
+        assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
 
-        final JsonArray secondRequest = extractNonEmptyRequestBody(httpRequests.get(1));
+        final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
         assertThat(secondRequest.size()).isEqualTo(2);
-        assertThat(secondRequest.get(0).getAsString()).isEqualTo("and");
-        assertThat(secondRequest.get(1).getAsString()).isEqualTo("pierogi");
+        assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0));
+        assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1));
     }
 
     @Test
     void puttingHighNumberOfElementsShouldReturnMoreResponses() {
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
+                threeJsonMessages, twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequest, Flux.just("I", "like", "cookies", "and", "pierogi").map(JsonPrimitive::new));
+                .put(mrRequestJson, doubleJsonMessageBatch);
 
         // then
         StepVerifier.create(responses)
                 .consumeNextWith(response -> {
                     assertThat(response.successful()).describedAs("successful").isTrue();
                     assertThat(response.items()).containsExactly(
-                            new JsonPrimitive("I"),
-                            new JsonPrimitive("like"),
-                            new JsonPrimitive("cookies"));
+                            getAsJsonObject(threeJsonMessages.get(0)),
+                            getAsJsonObject(threeJsonMessages.get(1)),
+                            getAsJsonObject(threeJsonMessages.get(2)));
                 })
                 .consumeNextWith(response -> {
                     assertThat(response.successful()).describedAs("successful").isTrue();
                     assertThat(response.items()).containsExactly(
-                            new JsonPrimitive("and"),
-                            new JsonPrimitive("pierogi"));
+                            getAsJsonObject(twoJsonMessages.get(0)),
+                            getAsJsonObject(twoJsonMessages.get(1)));
                 })
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
-    private JsonArray extractNonEmptyRequestBody(HttpRequest httpRequest) {
+    private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+        return plainTextMessages.stream()
+                .map(message -> String.format("{\"message\":\"%s\"}", message))
+                .collect(Collectors.toList());
+    }
+
+
+    private static Flux<JsonObject> jsonBatch(List<String> messages){
+        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+    }
+
+    private static List<String> concat(List<String> firstList, List<String> secondList){
+        return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList());
+    }
+
+    private static HttpResponse createHttpResponse(String statusReason, int statusCode){
+        return ImmutableHttpResponse.builder()
+                .statusCode(statusCode)
+                .url(sinkDefinition.topicUrl())
+                .statusReason(statusReason)
+                .rawBody("[]".getBytes())
+                .build();
+    }
+
+    private static MessageRouterPublishRequest createMRRPublishRequest(){
+        return ImmutableMessageRouterPublishRequest
+                .builder()
+                .sinkDefinition(sinkDefinition)
+                .build();
+    }
+
+    private String collectNonEmptyRequestBody(HttpRequest httpRequest){
         final String body = Flux.from(httpRequest.body().contents())
                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
                 .map(byteBufs -> byteBufs.toString(StandardCharsets.UTF_8))
                 .block();
         assertThat(body).describedAs("request body").isNotBlank();
-        return new Gson().fromJson(body, JsonArray.class);
+
+        return body;
+    }
+
+    private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
+        return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
+    }
+
+    private JsonObject getAsJsonObject(String item){
+        return new Gson().fromJson(item, JsonObject.class);
     }
 }
\ No newline at end of file