Add text/plain content type handling in Publisher 70/90070/6
authorIzabela Zawadzka <izabela.zawadzka@nokia.com>
Fri, 14 Jun 2019 13:26:29 +0000 (15:26 +0200)
committerIzabela Zawadzka <izabela.zawadzka@nokia.com>
Wed, 26 Jun 2019 07:23:52 +0000 (09:23 +0200)
Change-Id: I51e17d64f813e16b81385abb8aa862ee1f927d35
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1630

rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java [moved from rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterTestsUtils.java with 70% similarity]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java

diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/ContentType.java
new file mode 100644 (file)
index 0000000..80a28d6
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.util.AsciiString;
+
+public enum ContentType {
+    APPLICATION_JSON(HttpHeaderValues.APPLICATION_JSON),
+    TEXT_PLAIN(HttpHeaderValues.TEXT_PLAIN);
+
+    private AsciiString contentType;
+
+    ContentType(AsciiString contentType) {
+        this.contentType = contentType;
+    }
+
+    @Override
+    public String toString(){
+        return contentType.toString();
+    }
+}
index aa88b9e..191ec64 100644 (file)
@@ -27,6 +27,7 @@ import com.google.gson.JsonElement;
 import io.vavr.collection.HashMap;
 import io.vavr.collection.List;
 import java.time.Duration;
+import java.util.stream.Collectors;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -35,6 +36,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
@@ -74,14 +76,20 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
             List<JsonElement> batch) {
         LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
         LOGGER.trace("The items to be sent: {}", batch);
-        return httpClient.call(buildHttpRequest(request, asJsonBody(batch)))
+        return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
                 .map(httpResponse -> buildResponse(httpResponse, batch));
     }
 
-    private @NotNull RequestBody asJsonBody(List<? extends JsonElement> subItems) {
-        final JsonArray elements = new JsonArray(subItems.size());
-        subItems.forEach(elements::add);
-        return RequestBody.fromJson(elements);
+    private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
+        if(contentType == ContentType.APPLICATION_JSON) {
+            final JsonArray elements = new JsonArray(subItems.size());
+            subItems.forEach(elements::add);
+            return RequestBody.fromJson(elements);
+        }else if(contentType == ContentType.TEXT_PLAIN){
+            String messages = subItems.map(JsonElement::toString)
+                    .collect(Collectors.joining("\n"));
+            return RequestBody.fromString(messages);
+        }else throw new IllegalArgumentException("Unsupported content type: " + contentType);
     }
 
     private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
@@ -89,7 +97,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
                 .method(HttpMethod.POST)
                 .url(request.sinkDefinition().topicUrl())
                 .diagnosticContext(request.diagnosticContext().withNewInvocationId())
-                .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
+                .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
                 .body(body)
                 .build();
     }
@@ -98,6 +106,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
             HttpResponse httpResponse, List<JsonElement> batch) {
         final ImmutableMessageRouterPublishResponse.Builder builder =
                 ImmutableMessageRouterPublishResponse.builder();
+
         return httpResponse.successful()
                 ? builder.items(batch).build()
                 : builder.failReason(extractFailReason(httpResponse)).build();
index 77f92e7..2990413 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -34,7 +35,7 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
     MessageRouterSink sinkDefinition();
 
     @Value.Default
-    default String contentType() {
-        return "application/json";
+    default ContentType contentType() {
+        return ContentType.APPLICATION_JSON;
     }
 }
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
@@ -29,6 +29,8 @@ import io.vavr.collection.List;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
@@ -39,11 +41,16 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import reactor.core.publisher.Flux;
 
-final class MessageRouterTestsUtils {
+
+public final class MessageRouterTestsUtils {
     private static final JsonParser parser = new JsonParser();
     private MessageRouterTestsUtils() {}
 
-    static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+    public static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+        return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
+    }
+
+    public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){
         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
                 .name("the topic")
                 .topicUrl(topicUrl)
@@ -51,10 +58,11 @@ final class MessageRouterTestsUtils {
 
         return ImmutableMessageRouterPublishRequest.builder()
                 .sinkDefinition(sinkDefinition)
+                .contentType(contentType)
                 .build();
     }
 
-    static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+    public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
             String consumerGroup, String consumerId) {
         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
                 .name("the topic")
@@ -69,51 +77,59 @@ final class MessageRouterTestsUtils {
                 .build();
     }
 
-    static List<JsonElement> getAsJsonElements(List<String> messages){
+    public static List<JsonElement> getAsJsonElements(List<String> messages){
         return messages.map(parser::parse);
     }
 
-    static JsonObject getAsJsonObject(String item){
+    public static List<JsonObject> getAsJsonObjects(List<String> messages){
+        return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+    }
+
+    public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){
+        return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
+    }
+
+    public static JsonObject getAsJsonObject(String item){
         return new Gson().fromJson(item, JsonObject.class);
     }
 
-    static Flux<JsonObject> jsonBatch(List<String> messages){
-        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+    public static Flux<JsonElement> plainBatch(List<String> messages){
+        return Flux.fromIterable(getAsJsonElements(messages));
     }
 
-    static Flux<JsonPrimitive> plainBatch(List<String> messages){
-        return Flux.fromIterable(messages).map(JsonPrimitive::new);
+    public static Flux<JsonObject> jsonBatch(List<String> messages){
+        return Flux.fromIterable(getAsJsonObjects(messages));
     }
 
-    static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+    public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
                 .failReason(String.format(failReasonFormat, formatArgs))
                 .build();
     }
 
-    static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+    public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
                 .items(items)
                 .build();
     }
 
-    static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+    public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
         return ImmutableMessageRouterPublishResponse
                 .builder()
                 .failReason(String.format(failReasonFormat, formatArgs))
                 .build();
     }
 
-    static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+    public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
         return ImmutableMessageRouterPublishResponse
                 .builder()
                 .items(items)
                 .build();
     }
 
-    static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
+    public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
             MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
         final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
                 "{\"differentMessage\":\"message2\"}");
index c746bfe..0afea74 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
 import java.time.Duration;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
@@ -95,7 +95,7 @@ class MessageRouterPublisherIT {
         //given
         final String topic = "TOPIC2";
         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
-        final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
+        final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
@@ -112,6 +112,7 @@ class MessageRouterPublisherIT {
 
     @Test
     void publisher_shouldSuccessfullyPublishSingleMessage(){
+        //given
         final String topic = "TOPIC3";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
@@ -136,7 +137,7 @@ class MessageRouterPublisherIT {
 
     @Test
     void publisher_shouldSuccessfullyPublishMultipleMessages(){
-        final String topic = "TOPIC4";
+        final String topic = "TOPIC5";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
                 "{\"differentMessage\":\"message2\"}");
@@ -158,4 +159,112 @@ class MessageRouterPublisherIT {
                 .expectComplete()
                 .verify();
     }
+
+    @Test
+    void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
+        //given
+        final String topic = "TOPIC6";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, plainBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
+
+    @Test
+    void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
+        //given
+        final String topic = "TOPIC7";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+        final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, plainBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
+
+    @Test
+    void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
+        //given
+        final String topic = "TOPIC8";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+        final List<String> singlePlainMessage = List.of("kebab");
+        final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, plainBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
+
+    @Test
+    void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
+        //given
+        final String topic = "TOPIC9";
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+
+        final List<String> singlePlainMessage = List.of("I", "like", "pizza");
+        final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        //when
+        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, plainBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
 }
index c2e96b5..a758b2d 100644 (file)
@@ -20,7 +20,7 @@
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
index f44e86a..38659ac 100644 (file)
@@ -26,32 +26,28 @@ import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 import com.google.gson.Gson;
+import com.google.gson.JsonPrimitive;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.codec.http.HttpHeaderValues;
+import io.vavr.collection.List;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import reactor.core.publisher.Flux;
@@ -64,71 +60,123 @@ import reactor.test.StepVerifier;
  */
 class MessageRouterPublisherImplTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(5);
-    private static final JsonParser parser = new JsonParser();
-    private static final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
-            .name("the topic")
-            .topicUrl("https://dmaap-mr/TOPIC")
-            .build();
+    private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
+    private static final int MAX_BATCH_SIZE = 3;
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
-    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, 3, Duration.ofMinutes(1));
+    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
-    private final MessageRouterPublishRequest mrRequestTextPlain = createMRRPublishRequest();
-    private final MessageRouterPublishRequest mrRequestJson = createMRRPublishRequest();
+    private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
+    private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
 
     @Test
     void puttingElementsShouldYieldNonChunkedHttpRequest() {
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestTextPlain, singleJsonMessageBatch);
+                .put(jsonPublishRequest, singleJsonMessageBatch);
         responses.then().block();
 
         // then
         verify(httpClient).call(httpRequestArgumentCaptor.capture());
         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
-        assertThat(httpRequest.url()).isEqualTo(sinkDefinition.topicUrl());
+        assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
         assertThat(httpRequest.body()).isNotNull();
         assertThat(httpRequest.body().length()).isGreaterThan(0);
     }
 
     @Test
-    void puttingLowNumberOfElementsShouldYieldSingleHttpRequest() {
+    void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
-        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+        final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestJson, singleJsonMessageBatch);
+                .put(jsonPublishRequest, jsonMessagesMaxBatch);
         responses.then().block();
 
         // then
         verify(httpClient).call(httpRequestArgumentCaptor.capture());
         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
         final JsonArray elementsInRequest = extractNonEmptyJsonRequestBody(httpRequest);
-        assertThat(elementsInRequest.size()).isEqualTo(3);
-        assertThat(elementsInRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
-        assertThat(elementsInRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
-        assertThat(elementsInRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+
+        assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
+    }
+
+
+
+    @Test
+    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+        responses.then().block();
+
+        // then
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        final List<JsonObject> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
+                .map(JsonElement::getAsJsonObject);
+
+
+        assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
+    }
+
+    @Test
+    void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+        // given
+        final List<String> threePlainMessages = List.of("I", "like", "cookies");
+        final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+        responses.then().block();
+
+        // then
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        final List<JsonPrimitive> elementsInRequest = extractNonEmptyPlainRequestBody(httpRequest)
+                .map(JsonElement::getAsJsonPrimitive);
+
+        assertThat(elementsInRequest.size()).describedAs("Http request batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(elementsInRequest, parsedThreeMessages);
     }
 
     @Test
     void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestJson, singleJsonMessageBatch);
+                .put(jsonPublishRequest, singleJsonMessageBatch);
         responses.then().block();
 
         // then
@@ -139,124 +187,237 @@ class MessageRouterPublisherImplTest {
     }
 
     @Test
-    void puttingLowNumberOfElementsShouldReturnSingleResponse() {
+    void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
-        final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+        final Flux<JsonObject> jsonMessagesMaxBatch = jsonBatch(threeJsonMessages);
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestJson, singleJsonMessageBatch);
+                .put(jsonPublishRequest, jsonMessagesMaxBatch);
 
         // then
-        StepVerifier.create(responses)
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            getAsJsonObject(threeJsonMessages.get(0)),
-                            getAsJsonObject(threeJsonMessages.get(1)),
-                            getAsJsonObject(threeJsonMessages.get(2)));
-                })
-                .expectComplete()
-                .verify(TIMEOUT);
+        verifySingleResponse(parsedThreeMessages, responses);
     }
 
     @Test
-    void puttingHighNumberOfElementsShouldYieldMultipleHttpRequests() {
+    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
-        final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
-        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
-                threeJsonMessages, twoJsonMessages));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
 
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threeJsonMessages);
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestJson, doubleJsonMessageBatch);
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+
+        // then
+        verifySingleResponse(parsedThreeMessages, responses);
+    }
+
+    @Test
+    void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldReturnSingleResponse() {
+        // given
+        final List<String> threePlainMessages = List.of("I", "like", "cookies");
+        final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(threePlainMessages);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+
+        // then
+        verifySingleResponse(parsedThreeMessages, responses);
+    }
+
+    @Test
+    void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+        final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(
+                threeJsonMessages.appendAll(twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(jsonPublishRequest, doubleJsonMessageBatch);
         // then
         responses.then().block();
 
         verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
-        final List<HttpRequest> httpRequests = httpRequestArgumentCaptor.getAllValues();
+        final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
         assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
 
         final JsonArray firstRequest = extractNonEmptyJsonRequestBody(httpRequests.get(0));
-        assertThat(firstRequest.size()).isEqualTo(3);
-        assertThat(firstRequest.get(0).toString()).isEqualTo(threeJsonMessages.get(0));
-        assertThat(firstRequest.get(1).toString()).isEqualTo(threeJsonMessages.get(1));
-        assertThat(firstRequest.get(2).toString()).isEqualTo(threeJsonMessages.get(2));
+        assertThat(firstRequest.size()).describedAs("Http request first batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(firstRequest, parsedThreeMessages);
 
         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
-        assertThat(secondRequest.size()).isEqualTo(2);
-        assertThat(secondRequest.get(0).toString()).isEqualTo(twoJsonMessages.get(0));
-        assertThat(secondRequest.get(1).toString()).isEqualTo(twoJsonMessages.get(1));
+        assertThat(secondRequest.size()).describedAs("Http request second batch size")
+                .isEqualTo(MAX_BATCH_SIZE-1);
+        assertListsContainSameElements(secondRequest, parsedTwoMessages);
     }
 
     @Test
-    void puttingHighNumberOfElementsShouldReturnMoreResponses() {
+    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
         // given
-        final List<String> threeJsonMessages = getAsMRJsonMessages(Arrays.asList("I", "like", "cookies"));
-        final List<String> twoJsonMessages = getAsMRJsonMessages(Arrays.asList("and", "pierogi"));
-        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(concat(
-                threeJsonMessages, twoJsonMessages));
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+        final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+        final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+                threeJsonMessages.appendAll(twoJsonMessages));
         given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
-                .put(mrRequestJson, doubleJsonMessageBatch);
+                .put(plainPublishRequest, doublePlainMessageBatch);
+        // then
+        responses.then().block();
+
+        verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
+        final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
+        assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
+
+        final List<JsonObject> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
+                .map(JsonElement::getAsJsonObject);
+        assertThat(firstRequest.size()).describedAs("Http request first batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(firstRequest, parsedThreeMessages);
+
+        final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
+                .map(JsonElement::getAsJsonObject);
+        assertThat(secondRequest.size()).describedAs("Http request second batch size")
+                .isEqualTo(MAX_BATCH_SIZE-1);
+        assertListsContainSameElements(secondRequest, parsedTwoMessages);
+    }
+
+    @Test
+    void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldYieldMultipleHttpRequests() {
+        // given
+        final List<String> threePlainMessages = List.of("I", "like", "cookies");
+        final List<String> twoPlainMessages = List.of("and", "pierogi");
+
+        final List<JsonPrimitive> parsedThreePlainMessages = getAsJsonPrimitives(threePlainMessages);
+        final List<JsonPrimitive> parsedTwoPlainMessages = getAsJsonPrimitives(twoPlainMessages);
+
+        final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+                threePlainMessages.appendAll(twoPlainMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
 
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, doublePlainMessageBatch);
         // then
-        StepVerifier.create(responses)
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            getAsJsonObject(threeJsonMessages.get(0)),
-                            getAsJsonObject(threeJsonMessages.get(1)),
-                            getAsJsonObject(threeJsonMessages.get(2)));
-                })
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            getAsJsonObject(twoJsonMessages.get(0)),
-                            getAsJsonObject(twoJsonMessages.get(1)));
-                })
-                .expectComplete()
-                .verify(TIMEOUT);
+        responses.then().block();
+
+        verify(httpClient, times(2)).call(httpRequestArgumentCaptor.capture());
+        final List<HttpRequest> httpRequests = List.ofAll(httpRequestArgumentCaptor.getAllValues());
+        assertThat(httpRequests.size()).describedAs("number of requests").isEqualTo(2);
+
+        final List<JsonPrimitive> firstRequest = extractNonEmptyPlainRequestBody(httpRequests.get(0))
+                .map(JsonElement::getAsJsonPrimitive);
+        assertThat(firstRequest.size()).describedAs("Http request first batch size")
+                .isEqualTo(MAX_BATCH_SIZE);
+        assertListsContainSameElements(firstRequest, parsedThreePlainMessages);
+
+        final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
+                .map(JsonElement::getAsJsonPrimitive);
+        assertThat(secondRequest.size()).describedAs("Http request second batch size")
+                .isEqualTo(MAX_BATCH_SIZE-1);
+        assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
     }
 
-    private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
-        return plainTextMessages.stream()
-                .map(message -> String.format("{\"message\":\"%s\"}", message))
-                .collect(Collectors.toList());
+    @Test
+    void onPut_givenJsonMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+        final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+        // then
+        verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
+    }
+
+    @Test
+    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+        final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+        final Flux<JsonElement> doubleJsonMessageBatch = plainBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, doubleJsonMessageBatch);
+
+        // then
+        verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
     }
 
+    @Test
+    void onPut_givenPlainMessages_whenTheirAmountIsAboveMaxBatchSize_shouldReturnMoreResponses() {
+        // given
+        final List<String> threePlainMessages = List.of("I", "like", "cookies");
+        final List<String> twoPlainMessages = List.of("and", "pierogi");
+
+        final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
+        final List<JsonPrimitive> parsedTwoMessages = getAsJsonPrimitives(twoPlainMessages);
 
-    private static Flux<JsonObject> jsonBatch(List<String> messages){
-        return Flux.fromIterable(messages).map(parser::parse).map(JsonElement::getAsJsonObject);
+        final Flux<JsonElement> doublePlainMessageBatch = plainBatch(
+                threePlainMessages.appendAll(twoPlainMessages));
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(successHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, doublePlainMessageBatch);
+
+        // then
+        verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
     }
 
-    private static List<String> concat(List<String> firstList, List<String> secondList){
-        return Stream.concat(firstList.stream(), secondList.stream()).collect(Collectors.toList());
+    private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+        return plainTextMessages
+                .map(message -> String.format("{\"message\":\"%s\"}", message));
     }
 
     private static HttpResponse createHttpResponse(String statusReason, int statusCode){
         return ImmutableHttpResponse.builder()
                 .statusCode(statusCode)
-                .url(sinkDefinition.topicUrl())
+                .url(TOPIC_URL)
                 .statusReason(statusReason)
                 .rawBody("[]".getBytes())
                 .build();
     }
 
-    private static MessageRouterPublishRequest createMRRPublishRequest(){
-        return ImmutableMessageRouterPublishRequest
-                .builder()
-                .sinkDefinition(sinkDefinition)
-                .build();
-    }
-
     private String collectNonEmptyRequestBody(HttpRequest httpRequest){
         final String body = Flux.from(httpRequest.body().contents())
                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
@@ -272,7 +433,68 @@ class MessageRouterPublisherImplTest {
         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
     }
 
-    private JsonObject getAsJsonObject(String item){
-        return new Gson().fromJson(item, JsonObject.class);
+    private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
+        return getAsJsonElements(
+                List.of(
+                        collectNonEmptyRequestBody(httpRequest)
+                                .split("\n")
+                )
+        );
+    }
+
+    private void assertListsContainSameElements(List<? extends  JsonElement> actualMessages,
+            List<? extends JsonElement> expectedMessages){
+        for (int i = 0; i < actualMessages.size(); i++) {
+            assertThat(actualMessages.get(i))
+                    .describedAs(String.format("Http request element at position %d", i))
+                    .isEqualTo(expectedMessages.get(i));
+        }
+    }
+
+    private void assertListsContainSameElements(JsonArray actualMessages,
+            List<? extends JsonElement> expectedMessages){
+        assertThat(actualMessages.size()).describedAs("Http request batch size")
+                .isEqualTo(expectedMessages.size());
+
+        for (int i = 0; i < actualMessages.size(); i++) {
+            assertThat(actualMessages.get(i))
+                    .describedAs(String.format("Http request element at position %d", i))
+                    .isEqualTo(expectedMessages.get(i));
+        }
+    }
+
+    private void verifySingleResponse(List<? extends JsonElement> threeMessages,
+            Flux<MessageRouterPublishResponse> responses) {
+        StepVerifier.create(responses)
+                .consumeNextWith(response -> {
+                    assertThat(response.successful()).describedAs("successful").isTrue();
+                    assertThat(response.items()).containsExactly(
+                            threeMessages.get(0),
+                            threeMessages.get(1),
+                            threeMessages.get(2));
+                })
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
+            List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
+
+        StepVerifier.create(responses)
+                .consumeNextWith(response -> {
+                    assertThat(response.successful()).describedAs("successful").isTrue();
+                    assertThat(response.items()).containsExactly(
+                            threeMessages.get(0),
+                            threeMessages.get(1),
+                            threeMessages.get(2));
+                })
+                .consumeNextWith(response -> {
+                    assertThat(response.successful()).describedAs("successful").isTrue();
+                    assertThat(response.items()).containsExactly(
+                            twoMessages.get(0),
+                            twoMessages.get(1));
+                })
+                .expectComplete()
+                .verify(TIMEOUT);
     }
 }
\ No newline at end of file