TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / BaseDMaaPMRComponent.java
index 72f859c..22f25d1 100644 (file)
-/*
- * ===============================LICENSE_START======================================
- *  dcae-analytics
- * ================================================================================
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.util.EntityUtils;
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
-import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import static java.lang.String.format;
-
-/**
- * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
- *
- * @author Rajiv Singla . Creation Date: 11/1/2016.
- */
-public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
-
-    private static final ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Creates Base64 encoded Auth Header for given userName and Password
-     * If either user name of password are null return absent
-     *
-     * @param userName username
-     * @param userPassword user password
-     * @return base64 encoded auth header if username or password are both non null
-     */
-    protected static Optional<String> getAuthHeader(@Nullable final String userName,
-                                                    @Nullable final String userPassword) {
-        if (userName == null || userPassword == null) {
-            return Optional.absent();
-        } else {
-            final String auth = userName + ":" + userPassword;
-            final Charset isoCharset = Charset.forName("ISO-8859-1");
-            byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
-            return Optional.of("Basic " + new String(encodedAuth, isoCharset));
-        }
-    }
-
-
-    /**
-     * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
-     *
-     * @param publisherConfig publisher settings
-     *
-     * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
-     */
-    protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
-        final String hostName = publisherConfig.getHostName();
-        final Integer portNumber = publisherConfig.getPortNumber();
-        final String getProtocol = publisherConfig.getProtocol();
-        final String topicName = publisherConfig.getTopicName();
-        URI publisherURI = null;
-        try {
-            publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
-                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
-        } catch (URISyntaxException e) {
-            final String errorMessage = format("Error while creating publisher URI: %s", e);
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-        LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
-        return publisherURI;
-    }
-
-
-    /**
-     * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
-     *
-     * @param subscriberConfig subscriber settings
-     *
-     * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
-     */
-    protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
-        final String hostName = subscriberConfig.getHostName();
-        final Integer portNumber = subscriberConfig.getPortNumber();
-        final String getProtocol = subscriberConfig.getProtocol();
-        final String topicName = subscriberConfig.getTopicName();
-        final String consumerId = subscriberConfig.getConsumerId();
-        final String consumerGroup = subscriberConfig.getConsumerGroup();
-        final Integer timeoutMS = subscriberConfig.getTimeoutMS();
-        final Integer messageLimit = subscriberConfig.getMessageLimit();
-        URI subscriberURI = null;
-        try {
-            URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
-                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
-                            + topicName + "/"
-                            + consumerGroup + "/" +
-                            consumerId);
-            // add query params if present
-            if (timeoutMS > 0) {
-                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
-            }
-            if (messageLimit > 0) {
-                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
-                        messageLimit.toString());
-            }
-            subscriberURI = uriBuilder.build();
-
-        } catch (URISyntaxException e) {
-            final String errorMessage = format("Error while creating subscriber URI: %s", e);
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-
-        LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
-        return subscriberURI;
-    }
-
-
-    /**
-     *  Creates 202 (Accepted) Response code message
-     *
-     * @param batchQueueSize batch Queue size
-     *
-     * @return response with 202 message code
-     */
-    protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
-        return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
-                "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
-    }
-
-
-    /**
-     *  Creates 204 (No Content) Response code message
-     *
-     * @return response with 204 message code
-     */
-    protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
-        return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
-                "No Content - No Messages in batch queue for flushing to MR Topic", 0);
-    }
-
-
-    /**
-     * Creates Publisher Response for given response code, response Message and pending Message Count
-     *
-     * @param responseCode HTTP Status Code
-     * @param responseMessage response message
-     * @param pendingMessages pending messages in batch queue
-     *
-     * @return DMaaP MR Publisher Response
-     */
-    protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
-            responseMessage, int pendingMessages) {
-        return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
-    }
-
-
-    /**
-     * Returns weekly consistent pending messages in batch queue
-     *
-     * @param publisherQueue batch queue
-     * @param publisherConfig publisher settings
-     *
-     * @return pending messages to be published
-     */
-    protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
-                                            @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
-        return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
-    }
-
-
-    /**
-     * Creates Subscriber Response for give response Code, response Message and fetch messages
-     *
-     * @param responseCode response Code
-     * @param responseMessage response Message
-     * @param fetchedMessages fetched messages
-     *
-     * @return DMaaP MR Subscriber Response
-     */
-    protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
-            responseMessage, List<String> fetchedMessages) {
-        if (fetchedMessages == null) {
-            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
-        } else {
-            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
-        }
-    }
-
-
-    /**
-     * Custom response handler which extract status code and response body
-     *
-     * @return Pair containing Response code and response body
-     */
-    protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
-        return new ResponseHandler<Pair<Integer, String>>() {
-            @Override
-            public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
-                // Get Response status code
-                final int status = response.getStatusLine().getStatusCode();
-                final HttpEntity responseEntity = response.getEntity();
-                // If response entity is not null - extract response body as string
-                String responseEntityString = "";
-                if (responseEntity != null) {
-                    responseEntityString = EntityUtils.toString(responseEntity);
-                }
-                return new ImmutablePair<>(status, responseEntityString);
-            }
-        };
-    }
-
-
-    /**
-     *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
-     *  be lost
-     *
-     * @param publisherQueue publisher queue
-     * @param messages recoverable messages to be published to recovery queue
-     */
-    protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
-                                                     List<String> messages) {
-        try {
-            publisherQueue.addRecoverableMessages(messages);
-
-            LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
-                    messages.size(), publisherQueue.getBatchQueueRemainingSize());
-
-        } catch (IllegalStateException e) {
-            final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
-                            "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
-                    messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-    }
-
-
-    /**
-     * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
-     *
-     * @param messages messages that need to parsed to Json Array representation
-     * @return json string representation of message
-     */
-    protected static String convertToJsonString(@Nullable final List<String> messages) {
-        // If messages are null or empty just return empty array
-        if (messages == null || messages.isEmpty()) {
-            return "[]";
-        }
-
-
-        List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
-
-        try {
-            for (String message : messages) {
-                final JsonNode jsonNode = objectMapper.readTree(message);
-                jsonMessageObjectsList.add(jsonNode);
-            }
-            return objectMapper.writeValueAsString(jsonMessageObjectsList);
-        } catch (JsonProcessingException e) {
-            final String errorMessage =
-                    format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
-                            messages, e);
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-
-        } catch (IOException e) {
-            final String errorMessage =
-                    format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
-                            messages, e);
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-    }
-
-
-    /**
-     * Converts subscriber messages json string to List of messages. If message Json String is empty
-     * or null
-     *
-     * @param messagesJsonString json messages String
-     *
-     * @return List containing DMaaP MR Messages
-     */
-    protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
-
-        final LinkedList<String> messages = new LinkedList<>();
-
-        // If message string is not null or not empty parse json message array to List of string messages
-        if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
-                && !"[]".equals(messagesJsonString.trim())) {
-
-            try {
-                final List messageList = objectMapper.readValue(messagesJsonString, List.class);
-                for (Object message : messageList) {
-                    final String jsonMessageString = objectMapper.writeValueAsString(message);
-                    if (jsonMessageString.startsWith("\"") && jsonMessageString.endsWith("\"")) {
-                        final String jsonSubString = jsonMessageString.substring(1, jsonMessageString.length() - 1);
-                        messages.add(StringEscapeUtils.unescapeJson(jsonSubString));
-                    } else {
-                        messages.add(StringEscapeUtils.unescapeJson(jsonMessageString));
-                    }
-                }
-
-            } catch (IOException e) {
-                final String errorMessage =
-                        format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
-                                " Json Error: %s", messagesJsonString, e);
-                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-            }
-
-        }
-        return messages;
-    }
-
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ *  dcae-analytics\r
+ * ================================================================================\r
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ *  Licensed under the Apache License, Version 2.0 (the "License");\r
+ *  you may not use this file except in compliance with the License.\r
+ *   You may obtain a copy of the License at\r
+ *\r
+ *          http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ *  ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.dmaap.service;\r
+\r
+import com.fasterxml.jackson.core.JsonProcessingException;\r
+import com.fasterxml.jackson.databind.JsonNode;\r
+import com.fasterxml.jackson.databind.ObjectMapper;\r
+import com.google.common.base.Optional;\r
+import org.apache.commons.codec.binary.Base64;\r
+import org.apache.commons.lang3.StringEscapeUtils;\r
+import org.apache.commons.lang3.tuple.ImmutablePair;\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.apache.http.HttpEntity;\r
+import org.apache.http.HttpResponse;\r
+import org.apache.http.client.ResponseHandler;\r
+import org.apache.http.client.utils.URIBuilder;\r
+import org.apache.http.util.EntityUtils;\r
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
+import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;\r
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.net.URI;\r
+import java.net.URISyntaxException;\r
+import java.nio.charset.Charset;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+\r
+import javax.annotation.Nonnull;\r
+import javax.annotation.Nullable;\r
+\r
+import static java.lang.String.format;\r
+\r
+/**\r
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods\r
+ *\r
+ * @author Rajiv Singla . Creation Date: 11/1/2016.\r
+ */\r
+public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);\r
+\r
+    private static final ObjectMapper objectMapper = new ObjectMapper();\r
+\r
+    /**\r
+     * Creates Base64 encoded Auth Header for given userName and Password\r
+     * If either user name of password are null return absent\r
+     *\r
+     * @param userName username\r
+     * @param userPassword user password\r
+     * @return base64 encoded auth header if username or password are both non null\r
+     */\r
+    protected static Optional<String> getAuthHeader(@Nullable final String userName,\r
+                                                    @Nullable final String userPassword) {\r
+        if (userName == null || userPassword == null) {\r
+            return Optional.absent();\r
+        } else {\r
+            final String auth = userName + ":" + userPassword;\r
+            final Charset isoCharset = Charset.forName("ISO-8859-1");\r
+            byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));\r
+            return Optional.of("Basic " + new String(encodedAuth, isoCharset));\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}\r
+     *\r
+     * @param publisherConfig publisher settings\r
+     *\r
+     * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic\r
+     */\r
+    protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {\r
+        final String hostName = publisherConfig.getHostName();\r
+        final Integer portNumber = publisherConfig.getPortNumber();\r
+        final String getProtocol = publisherConfig.getProtocol();\r
+        final String topicName = publisherConfig.getTopicName();\r
+        URI publisherURI = null;\r
+        try {\r
+            publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
+                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();\r
+        } catch (URISyntaxException e) {\r
+            final String errorMessage = format("Error while creating publisher URI: %s", e);\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+        LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);\r
+        return publisherURI;\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}\r
+     *\r
+     * @param subscriberConfig subscriber settings\r
+     *\r
+     * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic\r
+     */\r
+    protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {\r
+        final String hostName = subscriberConfig.getHostName();\r
+        final Integer portNumber = subscriberConfig.getPortNumber();\r
+        final String getProtocol = subscriberConfig.getProtocol();\r
+        final String topicName = subscriberConfig.getTopicName();\r
+        final String consumerId = subscriberConfig.getConsumerId();\r
+        final String consumerGroup = subscriberConfig.getConsumerGroup();\r
+        final Integer timeoutMS = subscriberConfig.getTimeoutMS();\r
+        final Integer messageLimit = subscriberConfig.getMessageLimit();\r
+        URI subscriberURI = null;\r
+        try {\r
+            URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
+                    .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX\r
+                            + topicName + "/"\r
+                            + consumerGroup + "/" +\r
+                            consumerId);\r
+            // add query params if present\r
+            if (timeoutMS > 0) {\r
+                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());\r
+            }\r
+            if (messageLimit > 0) {\r
+                uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,\r
+                        messageLimit.toString());\r
+            }\r
+            subscriberURI = uriBuilder.build();\r
+\r
+        } catch (URISyntaxException e) {\r
+            final String errorMessage = format("Error while creating subscriber URI: %s", e);\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+\r
+        LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);\r
+        return subscriberURI;\r
+    }\r
+\r
+\r
+    /**\r
+     *  Creates 202 (Accepted) Response code message\r
+     *\r
+     * @param batchQueueSize batch Queue size\r
+     *\r
+     * @return response with 202 message code\r
+     */\r
+    protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {\r
+        return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,\r
+                "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);\r
+    }\r
+\r
+\r
+    /**\r
+     *  Creates 204 (No Content) Response code message\r
+     *\r
+     * @return response with 204 message code\r
+     */\r
+    protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {\r
+        return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,\r
+                "No Content - No Messages in batch queue for flushing to MR Topic", 0);\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates Publisher Response for given response code, response Message and pending Message Count\r
+     *\r
+     * @param responseCode HTTP Status Code\r
+     * @param responseMessage response message\r
+     * @param pendingMessages pending messages in batch queue\r
+     *\r
+     * @return DMaaP MR Publisher Response\r
+     */\r
+    protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String\r
+            responseMessage, int pendingMessages) {\r
+        return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);\r
+    }\r
+\r
+\r
+    /**\r
+     * Returns weekly consistent pending messages in batch queue\r
+     *\r
+     * @param publisherQueue batch queue\r
+     * @param publisherConfig publisher settings\r
+     *\r
+     * @return pending messages to be published\r
+     */\r
+    protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,\r
+                                            @Nonnull final DMaaPMRPublisherConfig publisherConfig) {\r
+        return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();\r
+    }\r
+\r
+\r
+    /**\r
+     * Creates Subscriber Response for give response Code, response Message and fetch messages\r
+     *\r
+     * @param responseCode response Code\r
+     * @param responseMessage response Message\r
+     * @param fetchedMessages fetched messages\r
+     *\r
+     * @return DMaaP MR Subscriber Response\r
+     */\r
+    protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String\r
+            responseMessage, List<String> fetchedMessages) {\r
+        if (fetchedMessages == null) {\r
+            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);\r
+        } else {\r
+            return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Custom response handler which extract status code and response body\r
+     *\r
+     * @return Pair containing Response code and response body\r
+     */\r
+    protected static ResponseHandler<Pair<Integer, String>> responseHandler() {\r
+        return new ResponseHandler<Pair<Integer, String>>() {\r
+            @Override\r
+            public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {\r
+                // Get Response status code\r
+                final int status = response.getStatusLine().getStatusCode();\r
+                final HttpEntity responseEntity = response.getEntity();\r
+                // If response entity is not null - extract response body as string\r
+                String responseEntityString = "";\r
+                if (responseEntity != null) {\r
+                    responseEntityString = EntityUtils.toString(responseEntity);\r
+                }\r
+                return new ImmutablePair<>(status, responseEntityString);\r
+            }\r
+        };\r
+    }\r
+\r
+\r
+    /**\r
+     *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will\r
+     *  be lost\r
+     *\r
+     * @param publisherQueue publisher queue\r
+     * @param messages recoverable messages to be published to recovery queue\r
+     */\r
+    protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,\r
+                                                     List<String> messages) {\r
+        try {\r
+            publisherQueue.addRecoverableMessages(messages);\r
+\r
+            LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",\r
+                    messages.size(), publisherQueue.getBatchQueueRemainingSize());\r
+\r
+        } catch (IllegalStateException e) {\r
+            final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +\r
+                            "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",\r
+                    messages.size(), publisherQueue.getRecoveryQueueRemainingSize());\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Converts List of messages to Json String Array which can be published to DMaaP MR topic.\r
+     *\r
+     * @param messages messages that need to parsed to Json Array representation\r
+     * @return json string representation of message\r
+     */\r
+    protected static String convertToJsonString(@Nullable final List<String> messages) {\r
+        // If messages are null or empty just return empty array\r
+        if (messages == null || messages.isEmpty()) {\r
+            return "[]";\r
+        }\r
+\r
+\r
+        List<JsonNode> jsonMessageObjectsList = new LinkedList<>();\r
+\r
+        try {\r
+            for (String message : messages) {\r
+                final JsonNode jsonNode = objectMapper.readTree(message);\r
+                jsonMessageObjectsList.add(jsonNode);\r
+            }\r
+            return objectMapper.writeValueAsString(jsonMessageObjectsList);\r
+        } catch (JsonProcessingException e) {\r
+            final String errorMessage =\r
+                    format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",\r
+                            messages, e);\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+\r
+        } catch (IOException e) {\r
+            final String errorMessage =\r
+                    format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",\r
+                            messages, e);\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+    }\r
+\r
+\r
+    /**\r
+     * Converts subscriber messages json string to List of messages. If message Json String is empty\r
+     * or null\r
+     *\r
+     * @param messagesJsonString json messages String\r
+     *\r
+     * @return List containing DMaaP MR Messages\r
+     */\r
+    protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {\r
+\r
+        final LinkedList<String> messages = new LinkedList<>();\r
+\r
+        // If message string is not null or not empty parse json message array to List of string messages\r
+        if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()\r
+                && !("[]").equals(messagesJsonString.trim())) {\r
+\r
+            try {\r
+                // get root node\r
+                final JsonNode rootNode = objectMapper.readTree(messagesJsonString);\r
+                // iterate over root node and parse arrays messages\r
+                for (JsonNode jsonNode : rootNode) {\r
+                    // if array parse it is array of messages\r
+                    final String incomingMessageString = jsonNode.toString();\r
+                    if (jsonNode.isArray()) {\r
+                        final List messageList = objectMapper.readValue(incomingMessageString, List.class);\r
+                        for (Object message : messageList) {\r
+                            final String jsonMessageString = objectMapper.writeValueAsString(message);\r
+                            addUnescapedJsonToMessage(messages, jsonMessageString);\r
+                        }\r
+                    } else {\r
+                        // parse it as object\r
+                        addUnescapedJsonToMessage(messages, incomingMessageString);\r
+                    }\r
+                }\r
+\r
+            } catch (IOException e) {\r
+                final String errorMessage =\r
+                        format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +\r
+                                " Json Error: %s", messagesJsonString, e);\r
+                throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+            }\r
+\r
+        }\r
+        return messages;\r
+    }\r
+\r
+    /**\r
+     * Adds unescaped Json messages to given messages list\r
+     *\r
+     * @param messages message list in which unescaped messages will be added\r
+     * @param incomingMessageString incoming message string that may need to be escaped\r
+     */\r
+    private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {\r
+        if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {\r
+            messages.add(StringEscapeUtils.unescapeJson(\r
+                    incomingMessageString.substring(1, incomingMessageString.length() - 1)));\r
+        } else {\r
+            messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));\r
+        }\r
+    }\r
+\r
+\r
+}\r