TCA: Replace any openecomp reference by onap
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / onap / dcae / apod / analytics / dmaap / service / BaseDMaaPMRComponent.java
-/*\r
- * ===============================LICENSE_START======================================\r
- *  dcae-analytics\r
- * ================================================================================\r
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- *  Licensed under the Apache License, Version 2.0 (the "License");\r
- *  you may not use this file except in compliance with the License.\r
- *   You may obtain a copy of the License at\r
- *\r
- *          http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- *  Unless required by applicable law or agreed to in writing, software\r
- *  distributed under the License is distributed on an "AS IS" BASIS,\r
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- *  See the License for the specific language governing permissions and\r
- *  limitations under the License.\r
- *  ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.dmaap.service;\r
-\r
-import com.fasterxml.jackson.core.JsonProcessingException;\r
-import com.fasterxml.jackson.databind.JsonNode;\r
-import com.fasterxml.jackson.databind.ObjectMapper;\r
-import com.google.common.base.Optional;\r
-import org.apache.commons.codec.binary.Base64;\r
-import org.apache.commons.lang3.StringEscapeUtils;\r
-import org.apache.commons.lang3.tuple.ImmutablePair;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.apache.http.HttpEntity;\r
-import org.apache.http.HttpResponse;\r
-import org.apache.http.client.ResponseHandler;\r
-import org.apache.http.client.utils.URIBuilder;\r
-import org.apache.http.util.EntityUtils;\r
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;\r
-import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.net.URI;\r
-import java.net.URISyntaxException;\r
-import java.nio.charset.Charset;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-\r
-import javax.annotation.Nonnull;\r
-import javax.annotation.Nullable;\r
-\r
-import static java.lang.String.format;\r
-\r
-/**\r
- * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods\r
- *\r
- * @author Rajiv Singla . Creation Date: 11/1/2016.\r
- */\r
-public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {\r
-\r
-    private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);\r
-\r
-    private static final ObjectMapper objectMapper = new ObjectMapper();\r
-\r
-    /**\r
-     * Creates Base64 encoded Auth Header for given userName and Password\r
-     * If either user name of password are null return absent\r
-     *\r
-     * @param userName username\r
-     * @param userPassword user password\r
-     * @return base64 encoded auth header if username or password are both non null\r
-     */\r
-    protected static Optional<String> getAuthHeader(@Nullable final String userName,\r
-                                                    @Nullable final String userPassword) {\r
-        if (userName == null || userPassword == null) {\r
-            return Optional.absent();\r
-        } else {\r
-            final String auth = userName + ":" + userPassword;\r
-            final Charset isoCharset = Charset.forName("ISO-8859-1");\r
-            byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));\r
-            return Optional.of("Basic " + new String(encodedAuth, isoCharset));\r
-        }\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}\r
-     *\r
-     * @param publisherConfig publisher settings\r
-     *\r
-     * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic\r
-     */\r
-    protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {\r
-        final String hostName = publisherConfig.getHostName();\r
-        final Integer portNumber = publisherConfig.getPortNumber();\r
-        final String getProtocol = publisherConfig.getProtocol();\r
-        final String topicName = publisherConfig.getTopicName();\r
-        URI publisherURI = null;\r
-        try {\r
-            publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
-                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();\r
-        } catch (URISyntaxException e) {\r
-            final String errorMessage = format("Error while creating publisher URI: %s", e);\r
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-        }\r
-        LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);\r
-        return publisherURI;\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}\r
-     *\r
-     * @param subscriberConfig subscriber settings\r
-     *\r
-     * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic\r
-     */\r
-    protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {\r
-        final String hostName = subscriberConfig.getHostName();\r
-        final Integer portNumber = subscriberConfig.getPortNumber();\r
-        final String getProtocol = subscriberConfig.getProtocol();\r
-        final String topicName = subscriberConfig.getTopicName();\r
-        final String consumerId = subscriberConfig.getConsumerId();\r
-        final String consumerGroup = subscriberConfig.getConsumerGroup();\r
-        final Integer timeoutMS = subscriberConfig.getTimeoutMS();\r
-        final Integer messageLimit = subscriberConfig.getMessageLimit();\r
-        URI subscriberURI = null;\r
-        try {\r
-            URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
-                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX\r
-                            + topicName + "/"\r
-                            + consumerGroup + "/" +\r
-                            consumerId);\r
-            // add query params if present\r
-            if (timeoutMS > 0) {\r
-                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());\r
-            }\r
-            if (messageLimit > 0) {\r
-                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,\r
-                        messageLimit.toString());\r
-            }\r
-            subscriberURI = uriBuilder.build();\r
-\r
-        } catch (URISyntaxException e) {\r
-            final String errorMessage = format("Error while creating subscriber URI: %s", e);\r
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-        }\r
-\r
-        LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);\r
-        return subscriberURI;\r
-    }\r
-\r
-\r
-    /**\r
-     *  Creates 202 (Accepted) Response code message\r
-     *\r
-     * @param batchQueueSize batch Queue size\r
-     *\r
-     * @return response with 202 message code\r
-     */\r
-    protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {\r
-        return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,\r
-                "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);\r
-    }\r
-\r
-\r
-    /**\r
-     *  Creates 204 (No Content) Response code message\r
-     *\r
-     * @return response with 204 message code\r
-     */\r
-    protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {\r
-        return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,\r
-                "No Content - No Messages in batch queue for flushing to MR Topic", 0);\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates Publisher Response for given response code, response Message and pending Message Count\r
-     *\r
-     * @param responseCode HTTP Status Code\r
-     * @param responseMessage response message\r
-     * @param pendingMessages pending messages in batch queue\r
-     *\r
-     * @return DMaaP MR Publisher Response\r
-     */\r
-    protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String\r
-            responseMessage, int pendingMessages) {\r
-        return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);\r
-    }\r
-\r
-\r
-    /**\r
-     * Returns weekly consistent pending messages in batch queue\r
-     *\r
-     * @param publisherQueue batch queue\r
-     * @param publisherConfig publisher settings\r
-     *\r
-     * @return pending messages to be published\r
-     */\r
-    protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,\r
-                                            @Nonnull final DMaaPMRPublisherConfig publisherConfig) {\r
-        return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();\r
-    }\r
-\r
-\r
-    /**\r
-     * Creates Subscriber Response for give response Code, response Message and fetch messages\r
-     *\r
-     * @param responseCode response Code\r
-     * @param responseMessage response Message\r
-     * @param fetchedMessages fetched messages\r
-     *\r
-     * @return DMaaP MR Subscriber Response\r
-     */\r
-    protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String\r
-            responseMessage, List<String> fetchedMessages) {\r
-        if (fetchedMessages == null) {\r
-            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);\r
-        } else {\r
-            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);\r
-        }\r
-    }\r
-\r
-\r
-    /**\r
-     * Custom response handler which extract status code and response body\r
-     *\r
-     * @return Pair containing Response code and response body\r
-     */\r
-    protected static ResponseHandler<Pair<Integer, String>> responseHandler() {\r
-        return new ResponseHandler<Pair<Integer, String>>() {\r
-            @Override\r
-            public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {\r
-                // Get Response status code\r
-                final int status = response.getStatusLine().getStatusCode();\r
-                final HttpEntity responseEntity = response.getEntity();\r
-                // If response entity is not null - extract response body as string\r
-                String responseEntityString = "";\r
-                if (responseEntity != null) {\r
-                    responseEntityString = EntityUtils.toString(responseEntity);\r
-                }\r
-                return new ImmutablePair<>(status, responseEntityString);\r
-            }\r
-        };\r
-    }\r
-\r
-\r
-    /**\r
-     *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will\r
-     *  be lost\r
-     *\r
-     * @param publisherQueue publisher queue\r
-     * @param messages recoverable messages to be published to recovery queue\r
-     */\r
-    protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,\r
-                                                     List<String> messages) {\r
-        try {\r
-            publisherQueue.addRecoverableMessages(messages);\r
-\r
-            LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",\r
-                    messages.size(), publisherQueue.getBatchQueueRemainingSize());\r
-\r
-        } catch (IllegalStateException e) {\r
-            final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +\r
-                            "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",\r
-                    messages.size(), publisherQueue.getRecoveryQueueRemainingSize());\r
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-        }\r
-    }\r
-\r
-\r
-    /**\r
-     * Converts List of messages to Json String Array which can be published to DMaaP MR topic.\r
-     *\r
-     * @param messages messages that need to parsed to Json Array representation\r
-     * @return json string representation of message\r
-     */\r
-    protected static String convertToJsonString(@Nullable final List<String> messages) {\r
-        // If messages are null or empty just return empty array\r
-        if (messages == null || messages.isEmpty()) {\r
-            return "[]";\r
-        }\r
-\r
-\r
-        List<JsonNode> jsonMessageObjectsList = new LinkedList<>();\r
-\r
-        try {\r
-            for (String message : messages) {\r
-                final JsonNode jsonNode = objectMapper.readTree(message);\r
-                jsonMessageObjectsList.add(jsonNode);\r
-            }\r
-            return objectMapper.writeValueAsString(jsonMessageObjectsList);\r
-        } catch (JsonProcessingException e) {\r
-            final String errorMessage =\r
-                    format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",\r
-                            messages, e);\r
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-\r
-        } catch (IOException e) {\r
-            final String errorMessage =\r
-                    format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",\r
-                            messages, e);\r
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-        }\r
-    }\r
-\r
-\r
-    /**\r
-     * Converts subscriber messages json string to List of messages. If message Json String is empty\r
-     * or null\r
-     *\r
-     * @param messagesJsonString json messages String\r
-     *\r
-     * @return List containing DMaaP MR Messages\r
-     */\r
-    protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {\r
-\r
-        final LinkedList<String> messages = new LinkedList<>();\r
-\r
-        // If message string is not null or not empty parse json message array to List of string messages\r
-        if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()\r
-                && !("[]").equals(messagesJsonString.trim())) {\r
-\r
-            try {\r
-                // get root node\r
-                final JsonNode rootNode = objectMapper.readTree(messagesJsonString);\r
-                // iterate over root node and parse arrays messages\r
-                for (JsonNode jsonNode : rootNode) {\r
-                    // if array parse it is array of messages\r
-                    final String incomingMessageString = jsonNode.toString();\r
-                    if (jsonNode.isArray()) {\r
-                        final List messageList = objectMapper.readValue(incomingMessageString, List.class);\r
-                        for (Object message : messageList) {\r
-                            final String jsonMessageString = objectMapper.writeValueAsString(message);\r
-                            addUnescapedJsonToMessage(messages, jsonMessageString);\r
-                        }\r
-                    } else {\r
-                        // parse it as object\r
-                        addUnescapedJsonToMessage(messages, incomingMessageString);\r
-                    }\r
-                }\r
-\r
-            } catch (IOException e) {\r
-                final String errorMessage =\r
-                        format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +\r
-                                " Json Error: %s", messagesJsonString, e);\r
-                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-            }\r
-\r
-        }\r
-        return messages;\r
-    }\r
-\r
-    /**\r
-     * Adds unescaped Json messages to given messages list\r
-     *\r
-     * @param messages message list in which unescaped messages will be added\r
-     * @param incomingMessageString incoming message string that may need to be escaped\r
-     */\r
-    private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {\r
-        if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {\r
-            messages.add(StringEscapeUtils.unescapeJson(\r
-                    incomingMessageString.substring(1, incomingMessageString.length() - 1)));\r
-        } else {\r
-            messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));\r
-        }\r
-    }\r
-\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ *  dcae-analytics
+ * ================================================================================
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.dmaap.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
+import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
+
+/**
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Creates Base64 encoded Auth Header for given userName and Password
+     * If either user name of password are null return absent
+     *
+     * @param userName username
+     * @param userPassword user password
+     * @return base64 encoded auth header if username or password are both non null
+     */
+    protected static Optional<String> getAuthHeader(@Nullable final String userName,
+                                                    @Nullable final String userPassword) {
+        if (userName == null || userPassword == null) {
+            return Optional.absent();
+        } else {
+            final String auth = userName + ":" + userPassword;
+            final Charset isoCharset = Charset.forName("ISO-8859-1");
+            byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
+            return Optional.of("Basic " + new String(encodedAuth, isoCharset));
+        }
+    }
+
+
+    /**
+     * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
+     *
+     * @param publisherConfig publisher settings
+     *
+     * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
+     */
+    protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
+        final String hostName = publisherConfig.getHostName();
+        final Integer portNumber = publisherConfig.getPortNumber();
+        final String getProtocol = publisherConfig.getProtocol();
+        final String topicName = publisherConfig.getTopicName();
+        URI publisherURI = null;
+        try {
+            publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
+        } catch (URISyntaxException e) {
+            final String errorMessage = format("Error while creating publisher URI: %s", e);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+        LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
+        return publisherURI;
+    }
+
+
+    /**
+     * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
+     *
+     * @param subscriberConfig subscriber settings
+     *
+     * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
+     */
+    protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
+        final String hostName = subscriberConfig.getHostName();
+        final Integer portNumber = subscriberConfig.getPortNumber();
+        final String getProtocol = subscriberConfig.getProtocol();
+        final String topicName = subscriberConfig.getTopicName();
+        final String consumerId = subscriberConfig.getConsumerId();
+        final String consumerGroup = subscriberConfig.getConsumerGroup();
+        final Integer timeoutMS = subscriberConfig.getTimeoutMS();
+        final Integer messageLimit = subscriberConfig.getMessageLimit();
+        URI subscriberURI = null;
+        try {
+            URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
+                            + topicName + "/"
+                            + consumerGroup + "/" +
+                            consumerId);
+            // add query params if present
+            if (timeoutMS > 0) {
+                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
+            }
+            if (messageLimit > 0) {
+                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
+                        messageLimit.toString());
+            }
+            subscriberURI = uriBuilder.build();
+
+        } catch (URISyntaxException e) {
+            final String errorMessage = format("Error while creating subscriber URI: %s", e);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+
+        LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
+        return subscriberURI;
+    }
+
+
+    /**
+     *  Creates 202 (Accepted) Response code message
+     *
+     * @param batchQueueSize batch Queue size
+     *
+     * @return response with 202 message code
+     */
+    protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
+        return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
+                "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
+    }
+
+
+    /**
+     *  Creates 204 (No Content) Response code message
+     *
+     * @return response with 204 message code
+     */
+    protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
+        return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
+                "No Content - No Messages in batch queue for flushing to MR Topic", 0);
+    }
+
+
+    /**
+     * Creates Publisher Response for given response code, response Message and pending Message Count
+     *
+     * @param responseCode HTTP Status Code
+     * @param responseMessage response message
+     * @param pendingMessages pending messages in batch queue
+     *
+     * @return DMaaP MR Publisher Response
+     */
+    protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
+            responseMessage, int pendingMessages) {
+        return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
+    }
+
+
+    /**
+     * Returns weekly consistent pending messages in batch queue
+     *
+     * @param publisherQueue batch queue
+     * @param publisherConfig publisher settings
+     *
+     * @return pending messages to be published
+     */
+    protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
+                                            @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
+        return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
+    }
+
+
+    /**
+     * Creates Subscriber Response for give response Code, response Message and fetch messages
+     *
+     * @param responseCode response Code
+     * @param responseMessage response Message
+     * @param fetchedMessages fetched messages
+     *
+     * @return DMaaP MR Subscriber Response
+     */
+    protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
+            responseMessage, List<String> fetchedMessages) {
+        if (fetchedMessages == null) {
+            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
+        } else {
+            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
+        }
+    }
+
+
+    /**
+     * Custom response handler which extract status code and response body
+     *
+     * @return Pair containing Response code and response body
+     */
+    protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
+        return new ResponseHandler<Pair<Integer, String>>() {
+            @Override
+            public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
+                // Get Response status code
+                final int status = response.getStatusLine().getStatusCode();
+                final HttpEntity responseEntity = response.getEntity();
+                // If response entity is not null - extract response body as string
+                String responseEntityString = "";
+                if (responseEntity != null) {
+                    responseEntityString = EntityUtils.toString(responseEntity);
+                }
+                return new ImmutablePair<>(status, responseEntityString);
+            }
+        };
+    }
+
+
+    /**
+     *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
+     *  be lost
+     *
+     * @param publisherQueue publisher queue
+     * @param messages recoverable messages to be published to recovery queue
+     */
+    protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
+                                                     List<String> messages) {
+        try {
+            publisherQueue.addRecoverableMessages(messages);
+
+            LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
+                    messages.size(), publisherQueue.getBatchQueueRemainingSize());
+
+        } catch (IllegalStateException e) {
+            final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
+                            "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
+                    messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+    }
+
+
+    /**
+     * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
+     *
+     * @param messages messages that need to parsed to Json Array representation
+     * @return json string representation of message
+     */
+    protected static String convertToJsonString(@Nullable final List<String> messages) {
+        // If messages are null or empty just return empty array
+        if (messages == null || messages.isEmpty()) {
+            return "[]";
+        }
+
+
+        List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
+
+        try {
+            for (String message : messages) {
+                final JsonNode jsonNode = objectMapper.readTree(message);
+                jsonMessageObjectsList.add(jsonNode);
+            }
+            return objectMapper.writeValueAsString(jsonMessageObjectsList);
+        } catch (JsonProcessingException e) {
+            final String errorMessage =
+                    format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
+                            messages, e);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+
+        } catch (IOException e) {
+            final String errorMessage =
+                    format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
+                            messages, e);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+    }
+
+
+    /**
+     * Converts subscriber messages json string to List of messages. If message Json String is empty
+     * or null
+     *
+     * @param messagesJsonString json messages String
+     *
+     * @return List containing DMaaP MR Messages
+     */
+    protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+
+        final LinkedList<String> messages = new LinkedList<>();
+
+        // If message string is not null or not empty parse json message array to List of string messages
+        if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
+                && !("[]").equals(messagesJsonString.trim())) {
+
+            try {
+                // get root node
+                final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+                // iterate over root node and parse arrays messages
+                for (JsonNode jsonNode : rootNode) {
+                    // if array parse it is array of messages
+                    final String incomingMessageString = jsonNode.toString();
+                    if (jsonNode.isArray()) {
+                        final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+                        for (Object message : messageList) {
+                            final String jsonMessageString = objectMapper.writeValueAsString(message);
+                            addUnescapedJsonToMessage(messages, jsonMessageString);
+                        }
+                    } else {
+                        // parse it as object
+                        addUnescapedJsonToMessage(messages, incomingMessageString);
+                    }
+                }
+
+            } catch (IOException e) {
+                final String errorMessage =
+                        format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
+                                " Json Error: %s", messagesJsonString, e);
+                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+            }
+
+        }
+        return messages;
+    }
+
+    /**
+     * Adds unescaped Json messages to given messages list
+     *
+     * @param messages message list in which unescaped messages will be added
+     * @param incomingMessageString incoming message string that may need to be escaped
+     */
+    private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+        if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+            messages.add(StringEscapeUtils.unescapeJson(
+                    incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+        } else {
+            messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));
+        }
+    }
+
+
+}