import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockserver.client.MockServerClient;
-import org.mockserver.matchers.TimeToLive;
 import org.mockserver.matchers.Times;
 import org.mockserver.verify.VerificationTimes;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
                 .respond(response().withStatusCode(404));
-        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+                retryConfig(1, 1));
 
         //when
         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
         final String path = String.format("/events/%s", topic);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 10));
-        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+                retryConfig(1, 1));
 
         //when
         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
-    private MessageRouterPublisherConfig retryConfig() {
+    @Test
+    void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
+        final String topic = "TOPIC13";
+        final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
+        final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+    }
+
+    private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterPublisherConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
-                        .retryIntervalInSeconds(1)
-                        .retryCount(1)
+                        .retryIntervalInSeconds(retryInterval)
+                        .retryCount(retryCount)
                         .build())
                 .build();
     }
 
         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 5));
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
 
         //when
         Mono<MessageRouterSubscribeResponse> response = subscriber
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
                 .respond(response().withStatusCode(404));
-        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 1));
 
         //when
         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 10));
-        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 1));
 
         //when
         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
-    private MessageRouterSubscriberConfig retryConfig() {
+    @Test
+    void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+        //given
+        final String topic = "TOPIC8";
+        final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+                proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 5));
+
+        //when
+        registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+                createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+    }
+
+    private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterSubscriberConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
-                        .retryIntervalInSeconds(1)
-                        .retryCount(1)
+                        .retryIntervalInSeconds(retryInterval)
+                        .retryCount(retryCount)
                         .build())
                 .build();
     }
 
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Mono;
 import reactor.netty.ByteBufFlux;
+import reactor.netty.ByteBufMono;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
     }
 
     static RequestBody fromString(String contents, Charset charset) {
-        ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer();
-        encodedContents.writeCharSequence(contents, charset);
-
         return ImmutableRequestBody.builder()
-                .length(encodedContents.readableBytes())
-                .contents(Mono.just(encodedContents.retain()))
+                .length(contents.length())
+                .contents(ByteBufMono.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT))
                 .build();
     }
 
 
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.client.HttpClient.ResponseReceiver;
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
                 .request(request.method().asNetty())
-                .send(Flux.from(request.body().contents()))
+                .send(request.body().contents())
                 .uri(request.url());
     }
 
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
                 .request(request.method().asNetty())
-                .send(Flux.from(request.body().contents()))
+                .send(request.body().contents())
                 .uri(request.url());
     }