-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
-import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Date;
-import java.util.List;
-
-import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
-import static java.lang.String.format;
-
-/**
- * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
- *
- * @author Rajiv Singla . Creation Date: 10/13/2016.
- */
-public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
-
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
-
- private final DMaaPMRPublisherConfig publisherConfig;
- private final CloseableHttpClient closeableHttpClient;
- private final DMaaPMRPublisherQueue publisherQueue;
- private final Date publisherCreationTime;
- private URI publisherUri;
-
- @Inject
- public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
- DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
- CloseableHttpClient closeableHttpClient) {
-
- this.publisherConfig = publisherConfig;
- this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
- publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());
- this.closeableHttpClient = closeableHttpClient;
- this.publisherUri = createPublisherURI(publisherConfig);
- this.publisherCreationTime = new Date();
- }
-
-
- @Override
- public DMaaPMRPublisherResponse publish(List<String> messages) {
-
- final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
-
- // if messages size is less than batch queue size - just queue them for batch publishing
- if (batchQueueRemainingSize > messages.size()) {
- LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
- messages.size(), batchQueueRemainingSize);
- final int batchQueueSize = publisherQueue.addBatchMessages(messages);
- return createPublisherAcceptedResponse(batchQueueSize);
-
- } else {
-
- // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
- final List<String> queueMessages = publisherQueue.getMessageForPublishing();
- LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
- "Publisher Topic.");
- return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
- }
-
- }
-
- @Override
- public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
-
- LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
-
- final String contentType = publisherConfig.getContentType();
- final String userName = publisherConfig.getUserName();
- final String userPassword = publisherConfig.getUserPassword();
- final HttpPost postRequest = new HttpPost(publisherUri);
-
- // add Authorization Header if username and password are present
- final Optional<String> authHeader = getAuthHeader(userName, userPassword);
- if (authHeader.isPresent()) {
- postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
- } else {
- LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
- }
-
- // Create post string entity
- final String messagesJson = convertToJsonString(messages);
- final StringEntity requestEntity =
- new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
- postRequest.setEntity(requestEntity);
-
- try {
- final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
- final Integer responseCode = responsePair.getLeft();
- final String responseBody = responsePair.getRight();
- // if messages were published successfully, return successful response
- if (isSuccessfulResponseCode(responseCode)) {
- LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
- "Body: {}, Number of Messages published: {}",
- responseCode, responseBody, messages.size());
-
- } else {
- LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
- "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
- addMessagesToRecoveryQueue(publisherQueue, messages);
- }
-
- return createPublisherResponse(responseCode, responseBody,
- getPendingMessages(publisherQueue, publisherConfig));
-
- } catch (IOException e) {
- // If IO Error then we need to also put messages in recovery queue
- addMessagesToRecoveryQueue(publisherQueue, messages);
- final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
- "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
-
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
- }
-
- }
-
-
- @Override
- public DMaaPMRPublisherResponse flush() {
- final List<String> queueMessages = publisherQueue.getMessageForPublishing();
- // If there are no message return 204 (No Content) response code
- if (queueMessages.isEmpty()) {
- LOG.debug("No messages to publish to batch queue. Returning 204 status code");
- return createPublisherNoContentResponse();
- } else {
- // force publish messages in queue
- return forcePublish(queueMessages);
- }
- }
-
- @Override
- public Date getPublisherCreationTime() {
- return new Date(publisherCreationTime.getTime());
- }
-
- @Override
- public void close() throws Exception {
-
- // flush current message in the queue
- int retrialNumber = 0;
- int flushResponseCode;
-
- // automatic retries if messages cannot be flushed
- do {
- retrialNumber++;
- DMaaPMRPublisherResponse flushResponse = flush();
- flushResponseCode = flushResponse.getResponseCode();
-
- if (!isSuccessfulResponseCode(flushResponseCode)) {
- LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
- "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
- AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
-
- Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
- }
- } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
- !isSuccessfulResponseCode(flushResponseCode));
-
- if (!isSuccessfulResponseCode(flushResponseCode)) {
- LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
- } else {
- LOG.info("Successfully published all batched messages to publisher.");
- }
-
- // close http client
- closeableHttpClient.close();
-
- }
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ * dcae-analytics\r
+ * ================================================================================\r
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;\r
+\r
+import com.google.common.base.Optional;\r
+import com.google.common.collect.Iterables;\r
+import com.google.common.collect.Lists;\r
+import com.google.inject.Inject;\r
+import com.google.inject.assistedinject.Assisted;\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.apache.http.HttpHeaders;\r
+import org.apache.http.client.HttpClient;\r
+import org.apache.http.client.methods.HttpPost;\r
+import org.apache.http.entity.ContentType;\r
+import org.apache.http.entity.StringEntity;\r
+import org.apache.http.impl.client.CloseableHttpClient;\r
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
+import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.net.URI;\r
+import java.util.Date;\r
+import java.util.List;\r
+\r
+import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;\r
+import static java.lang.String.format;\r
+\r
+/**\r
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}\r
+ *\r
+ * @author Rajiv Singla . Creation Date: 10/13/2016.\r
+ */\r
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);\r
+\r
+ private final DMaaPMRPublisherConfig publisherConfig;\r
+ private final CloseableHttpClient closeableHttpClient;\r
+ private final DMaaPMRPublisherQueue publisherQueue;\r
+ private final Date publisherCreationTime;\r
+ private URI publisherUri;\r
+\r
+ @Inject\r
+ public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,\r
+ DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,\r
+ CloseableHttpClient closeableHttpClient) {\r
+\r
+ this.publisherConfig = publisherConfig;\r
+ this.publisherQueue = dMaaPMRPublisherQueueFactory.create(\r
+ publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());\r
+ this.closeableHttpClient = closeableHttpClient;\r
+ this.publisherUri = createPublisherURI(publisherConfig);\r
+ this.publisherCreationTime = new Date();\r
+ }\r
+\r
+\r
+ @Override\r
+ public DMaaPMRPublisherResponse publish(List<String> messages) {\r
+\r
+ final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();\r
+\r
+ // if messages size is less than batch queue size - just queue them for batch publishing\r
+ if (batchQueueRemainingSize > messages.size()) {\r
+ LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",\r
+ messages.size(), batchQueueRemainingSize);\r
+ final int batchQueueSize = publisherQueue.addBatchMessages(messages);\r
+ return createPublisherAcceptedResponse(batchQueueSize);\r
+\r
+ } else {\r
+\r
+ // grab all already queued messages, append current messages and force publish them to DMaaP MR topic\r
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
+ LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +\r
+ "Publisher Topic.");\r
+ return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));\r
+ }\r
+\r
+ }\r
+\r
+ @Override\r
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) {\r
+\r
+ LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());\r
+\r
+ final String contentType = publisherConfig.getContentType();\r
+ final String userName = publisherConfig.getUserName();\r
+ final String userPassword = publisherConfig.getUserPassword();\r
+ final HttpPost postRequest = new HttpPost(publisherUri);\r
+\r
+ // add Authorization Header if username and password are present\r
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);\r
+ if (authHeader.isPresent()) {\r
+ postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());\r
+ } else {\r
+ LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");\r
+ }\r
+\r
+ // Create post string entity\r
+ final String messagesJson = convertToJsonString(messages);\r
+ final StringEntity requestEntity =\r
+ new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));\r
+ postRequest.setEntity(requestEntity);\r
+\r
+ try {\r
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());\r
+ final Integer responseCode = responsePair.getLeft();\r
+ final String responseBody = responsePair.getRight();\r
+ // if messages were published successfully, return successful response\r
+ if (isSuccessfulResponseCode(responseCode)) {\r
+ LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +\r
+ "Body: {}, Number of Messages published: {}",\r
+ responseCode, responseBody, messages.size());\r
+\r
+ } else {\r
+ LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +\r
+ "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);\r
+ addMessagesToRecoveryQueue(publisherQueue, messages);\r
+ }\r
+\r
+ return createPublisherResponse(responseCode, responseBody,\r
+ getPendingMessages(publisherQueue, publisherConfig));\r
+\r
+ } catch (IOException e) {\r
+ // If IO Error then we need to also put messages in recovery queue\r
+ addMessagesToRecoveryQueue(publisherQueue, messages);\r
+ final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +\r
+ "Messages will be queued in recovery queue. Messages Size: %d", messages.size());\r
+\r
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+ }\r
+\r
+ }\r
+\r
+\r
+ @Override\r
+ public DMaaPMRPublisherResponse flush() {\r
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
+ // If there are no message return 204 (No Content) response code\r
+ if (queueMessages.isEmpty()) {\r
+ LOG.debug("No messages to publish to batch queue. Returning 204 status code");\r
+ return createPublisherNoContentResponse();\r
+ } else {\r
+ // force publish messages in queue\r
+ return forcePublish(queueMessages);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public Date getPublisherCreationTime() {\r
+ return new Date(publisherCreationTime.getTime());\r
+ }\r
+\r
+ @Override\r
+ public void close() throws Exception {\r
+\r
+ // flush current message in the queue\r
+ int retrialNumber = 0;\r
+ int flushResponseCode;\r
+\r
+ // automatic retries if messages cannot be flushed\r
+ do {\r
+ retrialNumber++;\r
+ DMaaPMRPublisherResponse flushResponse = flush();\r
+ flushResponseCode = flushResponse.getResponseCode();\r
+\r
+ if (!isSuccessfulResponseCode(flushResponseCode)) {\r
+ LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +\r
+ "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,\r
+ AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);\r
+\r
+ Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);\r
+ }\r
+ } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&\r
+ !isSuccessfulResponseCode(flushResponseCode));\r
+\r
+ if (!isSuccessfulResponseCode(flushResponseCode)) {\r
+ LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");\r
+ } else {\r
+ LOG.info("Successfully published all batched messages to publisher.");\r
+ }\r
+\r
+ // close http client\r
+ closeableHttpClient.close();\r
+\r
+ }\r
+}\r