-/*\r
- * ===============================LICENSE_START======================================\r
- * dcae-analytics\r
- * ================================================================================\r
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.dmaap.service;\r
-\r
-import com.fasterxml.jackson.core.JsonProcessingException;\r
-import com.fasterxml.jackson.databind.JsonNode;\r
-import com.fasterxml.jackson.databind.ObjectMapper;\r
-import com.google.common.base.Optional;\r
-import org.apache.commons.codec.binary.Base64;\r
-import org.apache.commons.lang3.StringEscapeUtils;\r
-import org.apache.commons.lang3.tuple.ImmutablePair;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.apache.http.HttpEntity;\r
-import org.apache.http.HttpResponse;\r
-import org.apache.http.client.ResponseHandler;\r
-import org.apache.http.client.utils.URIBuilder;\r
-import org.apache.http.util.EntityUtils;\r
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;\r
-import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.net.URI;\r
-import java.net.URISyntaxException;\r
-import java.nio.charset.Charset;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-\r
-import javax.annotation.Nonnull;\r
-import javax.annotation.Nullable;\r
-\r
-import static java.lang.String.format;\r
-\r
-/**\r
- * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods\r
- *\r
- * @author Rajiv Singla . Creation Date: 11/1/2016.\r
- */\r
-public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);\r
-\r
- private static final ObjectMapper objectMapper = new ObjectMapper();\r
-\r
- /**\r
- * Creates Base64 encoded Auth Header for given userName and Password\r
- * If either user name of password are null return absent\r
- *\r
- * @param userName username\r
- * @param userPassword user password\r
- * @return base64 encoded auth header if username or password are both non null\r
- */\r
- protected static Optional<String> getAuthHeader(@Nullable final String userName,\r
- @Nullable final String userPassword) {\r
- if (userName == null || userPassword == null) {\r
- return Optional.absent();\r
- } else {\r
- final String auth = userName + ":" + userPassword;\r
- final Charset isoCharset = Charset.forName("ISO-8859-1");\r
- byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));\r
- return Optional.of("Basic " + new String(encodedAuth, isoCharset));\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}\r
- *\r
- * @param publisherConfig publisher settings\r
- *\r
- * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic\r
- */\r
- protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {\r
- final String hostName = publisherConfig.getHostName();\r
- final Integer portNumber = publisherConfig.getPortNumber();\r
- final String getProtocol = publisherConfig.getProtocol();\r
- final String topicName = publisherConfig.getTopicName();\r
- URI publisherURI = null;\r
- try {\r
- publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
- .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();\r
- } catch (URISyntaxException e) {\r
- final String errorMessage = format("Error while creating publisher URI: %s", e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
- LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);\r
- return publisherURI;\r
- }\r
-\r
-\r
- /**\r
- * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}\r
- *\r
- * @param subscriberConfig subscriber settings\r
- *\r
- * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic\r
- */\r
- protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {\r
- final String hostName = subscriberConfig.getHostName();\r
- final Integer portNumber = subscriberConfig.getPortNumber();\r
- final String getProtocol = subscriberConfig.getProtocol();\r
- final String topicName = subscriberConfig.getTopicName();\r
- final String consumerId = subscriberConfig.getConsumerId();\r
- final String consumerGroup = subscriberConfig.getConsumerGroup();\r
- final Integer timeoutMS = subscriberConfig.getTimeoutMS();\r
- final Integer messageLimit = subscriberConfig.getMessageLimit();\r
- URI subscriberURI = null;\r
- try {\r
- URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
- .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX\r
- + topicName + "/"\r
- + consumerGroup + "/" +\r
- consumerId);\r
- // add query params if present\r
- if (timeoutMS > 0) {\r
- uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());\r
- }\r
- if (messageLimit > 0) {\r
- uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,\r
- messageLimit.toString());\r
- }\r
- subscriberURI = uriBuilder.build();\r
-\r
- } catch (URISyntaxException e) {\r
- final String errorMessage = format("Error while creating subscriber URI: %s", e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
-\r
- LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);\r
- return subscriberURI;\r
- }\r
-\r
-\r
- /**\r
- * Creates 202 (Accepted) Response code message\r
- *\r
- * @param batchQueueSize batch Queue size\r
- *\r
- * @return response with 202 message code\r
- */\r
- protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {\r
- return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,\r
- "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);\r
- }\r
-\r
-\r
- /**\r
- * Creates 204 (No Content) Response code message\r
- *\r
- * @return response with 204 message code\r
- */\r
- protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {\r
- return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,\r
- "No Content - No Messages in batch queue for flushing to MR Topic", 0);\r
- }\r
-\r
-\r
- /**\r
- * Creates Publisher Response for given response code, response Message and pending Message Count\r
- *\r
- * @param responseCode HTTP Status Code\r
- * @param responseMessage response message\r
- * @param pendingMessages pending messages in batch queue\r
- *\r
- * @return DMaaP MR Publisher Response\r
- */\r
- protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String\r
- responseMessage, int pendingMessages) {\r
- return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);\r
- }\r
-\r
-\r
- /**\r
- * Returns weekly consistent pending messages in batch queue\r
- *\r
- * @param publisherQueue batch queue\r
- * @param publisherConfig publisher settings\r
- *\r
- * @return pending messages to be published\r
- */\r
- protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,\r
- @Nonnull final DMaaPMRPublisherConfig publisherConfig) {\r
- return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();\r
- }\r
-\r
-\r
- /**\r
- * Creates Subscriber Response for give response Code, response Message and fetch messages\r
- *\r
- * @param responseCode response Code\r
- * @param responseMessage response Message\r
- * @param fetchedMessages fetched messages\r
- *\r
- * @return DMaaP MR Subscriber Response\r
- */\r
- protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String\r
- responseMessage, List<String> fetchedMessages) {\r
- if (fetchedMessages == null) {\r
- return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);\r
- } else {\r
- return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Custom response handler which extract status code and response body\r
- *\r
- * @return Pair containing Response code and response body\r
- */\r
- protected static ResponseHandler<Pair<Integer, String>> responseHandler() {\r
- return new ResponseHandler<Pair<Integer, String>>() {\r
- @Override\r
- public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {\r
- // Get Response status code\r
- final int status = response.getStatusLine().getStatusCode();\r
- final HttpEntity responseEntity = response.getEntity();\r
- // If response entity is not null - extract response body as string\r
- String responseEntityString = "";\r
- if (responseEntity != null) {\r
- responseEntityString = EntityUtils.toString(responseEntity);\r
- }\r
- return new ImmutablePair<>(status, responseEntityString);\r
- }\r
- };\r
- }\r
-\r
-\r
- /**\r
- * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will\r
- * be lost\r
- *\r
- * @param publisherQueue publisher queue\r
- * @param messages recoverable messages to be published to recovery queue\r
- */\r
- protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,\r
- List<String> messages) {\r
- try {\r
- publisherQueue.addRecoverableMessages(messages);\r
-\r
- LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",\r
- messages.size(), publisherQueue.getBatchQueueRemainingSize());\r
-\r
- } catch (IllegalStateException e) {\r
- final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +\r
- "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",\r
- messages.size(), publisherQueue.getRecoveryQueueRemainingSize());\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Converts List of messages to Json String Array which can be published to DMaaP MR topic.\r
- *\r
- * @param messages messages that need to parsed to Json Array representation\r
- * @return json string representation of message\r
- */\r
- protected static String convertToJsonString(@Nullable final List<String> messages) {\r
- // If messages are null or empty just return empty array\r
- if (messages == null || messages.isEmpty()) {\r
- return "[]";\r
- }\r
-\r
-\r
- List<JsonNode> jsonMessageObjectsList = new LinkedList<>();\r
-\r
- try {\r
- for (String message : messages) {\r
- final JsonNode jsonNode = objectMapper.readTree(message);\r
- jsonMessageObjectsList.add(jsonNode);\r
- }\r
- return objectMapper.writeValueAsString(jsonMessageObjectsList);\r
- } catch (JsonProcessingException e) {\r
- final String errorMessage =\r
- format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",\r
- messages, e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
-\r
- } catch (IOException e) {\r
- final String errorMessage =\r
- format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",\r
- messages, e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
- }\r
-\r
-\r
- /**\r
- * Converts subscriber messages json string to List of messages. If message Json String is empty\r
- * or null\r
- *\r
- * @param messagesJsonString json messages String\r
- *\r
- * @return List containing DMaaP MR Messages\r
- */\r
- protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {\r
-\r
- final LinkedList<String> messages = new LinkedList<>();\r
-\r
- // If message string is not null or not empty parse json message array to List of string messages\r
- if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()\r
- && !("[]").equals(messagesJsonString.trim())) {\r
-\r
- try {\r
- // get root node\r
- final JsonNode rootNode = objectMapper.readTree(messagesJsonString);\r
- // iterate over root node and parse arrays messages\r
- for (JsonNode jsonNode : rootNode) {\r
- // if array parse it is array of messages\r
- final String incomingMessageString = jsonNode.toString();\r
- if (jsonNode.isArray()) {\r
- final List messageList = objectMapper.readValue(incomingMessageString, List.class);\r
- for (Object message : messageList) {\r
- final String jsonMessageString = objectMapper.writeValueAsString(message);\r
- addUnescapedJsonToMessage(messages, jsonMessageString);\r
- }\r
- } else {\r
- // parse it as object\r
- addUnescapedJsonToMessage(messages, incomingMessageString);\r
- }\r
- }\r
-\r
- } catch (IOException e) {\r
- final String errorMessage =\r
- format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +\r
- " Json Error: %s", messagesJsonString, e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
-\r
- }\r
- return messages;\r
- }\r
-\r
- /**\r
- * Adds unescaped Json messages to given messages list\r
- *\r
- * @param messages message list in which unescaped messages will be added\r
- * @param incomingMessageString incoming message string that may need to be escaped\r
- */\r
- private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {\r
- if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {\r
- messages.add(StringEscapeUtils.unescapeJson(\r
- incomingMessageString.substring(1, incomingMessageString.length() - 1)));\r
- } else {\r
- messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));\r
- }\r
- }\r
-\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.dmaap.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
+import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
+
+/**
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Creates Base64 encoded Auth Header for given userName and Password
+ * If either user name of password are null return absent
+ *
+ * @param userName username
+ * @param userPassword user password
+ * @return base64 encoded auth header if username or password are both non null
+ */
+ protected static Optional<String> getAuthHeader(@Nullable final String userName,
+ @Nullable final String userPassword) {
+ if (userName == null || userPassword == null) {
+ return Optional.absent();
+ } else {
+ final String auth = userName + ":" + userPassword;
+ final Charset isoCharset = Charset.forName("ISO-8859-1");
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
+ return Optional.of("Basic " + new String(encodedAuth, isoCharset));
+ }
+ }
+
+
+ /**
+ * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
+ *
+ * @param publisherConfig publisher settings
+ *
+ * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
+ */
+ protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
+ final String hostName = publisherConfig.getHostName();
+ final Integer portNumber = publisherConfig.getPortNumber();
+ final String getProtocol = publisherConfig.getProtocol();
+ final String topicName = publisherConfig.getTopicName();
+ URI publisherURI = null;
+ try {
+ publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating publisher URI: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
+ return publisherURI;
+ }
+
+
+ /**
+ * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
+ *
+ * @param subscriberConfig subscriber settings
+ *
+ * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
+ */
+ protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
+ final String hostName = subscriberConfig.getHostName();
+ final Integer portNumber = subscriberConfig.getPortNumber();
+ final String getProtocol = subscriberConfig.getProtocol();
+ final String topicName = subscriberConfig.getTopicName();
+ final String consumerId = subscriberConfig.getConsumerId();
+ final String consumerGroup = subscriberConfig.getConsumerGroup();
+ final Integer timeoutMS = subscriberConfig.getTimeoutMS();
+ final Integer messageLimit = subscriberConfig.getMessageLimit();
+ URI subscriberURI = null;
+ try {
+ URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
+ + topicName + "/"
+ + consumerGroup + "/" +
+ consumerId);
+ // add query params if present
+ if (timeoutMS > 0) {
+ uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
+ }
+ if (messageLimit > 0) {
+ uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
+ messageLimit.toString());
+ }
+ subscriberURI = uriBuilder.build();
+
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating subscriber URI: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
+ return subscriberURI;
+ }
+
+
+ /**
+ * Creates 202 (Accepted) Response code message
+ *
+ * @param batchQueueSize batch Queue size
+ *
+ * @return response with 202 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
+ return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
+ "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
+ }
+
+
+ /**
+ * Creates 204 (No Content) Response code message
+ *
+ * @return response with 204 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
+ return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
+ "No Content - No Messages in batch queue for flushing to MR Topic", 0);
+ }
+
+
+ /**
+ * Creates Publisher Response for given response code, response Message and pending Message Count
+ *
+ * @param responseCode HTTP Status Code
+ * @param responseMessage response message
+ * @param pendingMessages pending messages in batch queue
+ *
+ * @return DMaaP MR Publisher Response
+ */
+ protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
+ responseMessage, int pendingMessages) {
+ return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
+ }
+
+
+ /**
+ * Returns weekly consistent pending messages in batch queue
+ *
+ * @param publisherQueue batch queue
+ * @param publisherConfig publisher settings
+ *
+ * @return pending messages to be published
+ */
+ protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
+ @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
+ return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
+ }
+
+
+ /**
+ * Creates Subscriber Response for give response Code, response Message and fetch messages
+ *
+ * @param responseCode response Code
+ * @param responseMessage response Message
+ * @param fetchedMessages fetched messages
+ *
+ * @return DMaaP MR Subscriber Response
+ */
+ protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
+ responseMessage, List<String> fetchedMessages) {
+ if (fetchedMessages == null) {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
+ } else {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
+ }
+ }
+
+
+ /**
+ * Custom response handler which extract status code and response body
+ *
+ * @return Pair containing Response code and response body
+ */
+ protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
+ return new ResponseHandler<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
+ // Get Response status code
+ final int status = response.getStatusLine().getStatusCode();
+ final HttpEntity responseEntity = response.getEntity();
+ // If response entity is not null - extract response body as string
+ String responseEntityString = "";
+ if (responseEntity != null) {
+ responseEntityString = EntityUtils.toString(responseEntity);
+ }
+ return new ImmutablePair<>(status, responseEntityString);
+ }
+ };
+ }
+
+
+ /**
+ * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
+ * be lost
+ *
+ * @param publisherQueue publisher queue
+ * @param messages recoverable messages to be published to recovery queue
+ */
+ protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
+ List<String> messages) {
+ try {
+ publisherQueue.addRecoverableMessages(messages);
+
+ LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
+ messages.size(), publisherQueue.getBatchQueueRemainingSize());
+
+ } catch (IllegalStateException e) {
+ final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
+ "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
+ messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
+ *
+ * @param messages messages that need to parsed to Json Array representation
+ * @return json string representation of message
+ */
+ protected static String convertToJsonString(@Nullable final List<String> messages) {
+ // If messages are null or empty just return empty array
+ if (messages == null || messages.isEmpty()) {
+ return "[]";
+ }
+
+
+ List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
+
+ try {
+ for (String message : messages) {
+ final JsonNode jsonNode = objectMapper.readTree(message);
+ jsonMessageObjectsList.add(jsonNode);
+ }
+ return objectMapper.writeValueAsString(jsonMessageObjectsList);
+ } catch (JsonProcessingException e) {
+ final String errorMessage =
+ format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts subscriber messages json string to List of messages. If message Json String is empty
+ * or null
+ *
+ * @param messagesJsonString json messages String
+ *
+ * @return List containing DMaaP MR Messages
+ */
+ protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+
+ final LinkedList<String> messages = new LinkedList<>();
+
+ // If message string is not null or not empty parse json message array to List of string messages
+ if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
+ && !("[]").equals(messagesJsonString.trim())) {
+
+ try {
+ // get root node
+ final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+ // iterate over root node and parse arrays messages
+ for (JsonNode jsonNode : rootNode) {
+ // if array parse it is array of messages
+ final String incomingMessageString = jsonNode.toString();
+ if (jsonNode.isArray()) {
+ final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+ for (Object message : messageList) {
+ final String jsonMessageString = objectMapper.writeValueAsString(message);
+ addUnescapedJsonToMessage(messages, jsonMessageString);
+ }
+ } else {
+ // parse it as object
+ addUnescapedJsonToMessage(messages, incomingMessageString);
+ }
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
+ " Json Error: %s", messagesJsonString, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+ return messages;
+ }
+
+ /**
+ * Adds unescaped Json messages to given messages list
+ *
+ * @param messages message list in which unescaped messages will be added
+ * @param incomingMessageString incoming message string that may need to be escaped
+ */
+ private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+ if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+ messages.add(StringEscapeUtils.unescapeJson(
+ incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+ } else {
+ messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));
+ }
+ }
+
+
+}