TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / publisher / DMaaPMRPublisherImpl.java
index 1901189..b3e303e 100644 (file)
-/*
- * ===============================LICENSE_START======================================
- *  dcae-analytics
- * ================================================================================
- *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *  ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
-import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Date;
-import java.util.List;
-
-import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
-import static java.lang.String.format;
-
-/**
- * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
- *
- * @author Rajiv Singla . Creation Date: 10/13/2016.
- */
-public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
-
-    private final DMaaPMRPublisherConfig publisherConfig;
-    private final CloseableHttpClient closeableHttpClient;
-    private final DMaaPMRPublisherQueue publisherQueue;
-    private final Date publisherCreationTime;
-    private URI publisherUri;
-
-    @Inject
-    public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
-                                DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
-                                CloseableHttpClient closeableHttpClient) {
-
-        this.publisherConfig = publisherConfig;
-        this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
-                publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());
-        this.closeableHttpClient = closeableHttpClient;
-        this.publisherUri = createPublisherURI(publisherConfig);
-        this.publisherCreationTime = new Date();
-    }
-
-
-    @Override
-    public DMaaPMRPublisherResponse publish(List<String> messages)  {
-
-        final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
-
-        // if messages size is less than batch queue size - just queue them for batch publishing
-        if (batchQueueRemainingSize > messages.size()) {
-            LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
-                    messages.size(), batchQueueRemainingSize);
-            final int batchQueueSize = publisherQueue.addBatchMessages(messages);
-            return createPublisherAcceptedResponse(batchQueueSize);
-
-        } else {
-
-            // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
-            final List<String> queueMessages = publisherQueue.getMessageForPublishing();
-            LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
-                    "Publisher Topic.");
-            return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
-        }
-
-    }
-
-    @Override
-    public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
-
-        LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
-
-        final String contentType = publisherConfig.getContentType();
-        final String userName = publisherConfig.getUserName();
-        final String userPassword = publisherConfig.getUserPassword();
-        final HttpPost postRequest = new HttpPost(publisherUri);
-
-        // add Authorization Header if username and password are present
-        final Optional<String> authHeader = getAuthHeader(userName, userPassword);
-        if (authHeader.isPresent()) {
-            postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
-        } else {
-            LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
-        }
-
-        // Create post string entity
-        final String messagesJson = convertToJsonString(messages);
-        final StringEntity requestEntity =
-                new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
-        postRequest.setEntity(requestEntity);
-
-        try {
-            final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
-            final Integer responseCode = responsePair.getLeft();
-            final String responseBody = responsePair.getRight();
-            // if messages were published successfully, return successful response
-            if (isSuccessfulResponseCode(responseCode)) {
-                LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
-                                "Body: {}, Number of Messages published: {}",
-                        responseCode, responseBody, messages.size());
-
-            } else {
-                LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
-                        "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
-                addMessagesToRecoveryQueue(publisherQueue, messages);
-            }
-
-            return createPublisherResponse(responseCode, responseBody,
-                    getPendingMessages(publisherQueue, publisherConfig));
-
-        } catch (IOException e) {
-            // If IO Error then we need to also put messages in recovery queue
-            addMessagesToRecoveryQueue(publisherQueue, messages);
-            final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
-                    "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
-
-            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
-        }
-
-    }
-
-
-    @Override
-    public DMaaPMRPublisherResponse flush() {
-        final List<String> queueMessages = publisherQueue.getMessageForPublishing();
-        // If there are no message return 204 (No Content) response code
-        if (queueMessages.isEmpty()) {
-            LOG.debug("No messages to publish to batch queue. Returning 204 status code");
-            return createPublisherNoContentResponse();
-        } else {
-            // force publish messages in queue
-            return forcePublish(queueMessages);
-        }
-    }
-
-    @Override
-    public Date getPublisherCreationTime() {
-        return new Date(publisherCreationTime.getTime());
-    }
-
-    @Override
-    public void close() throws Exception {
-
-        // flush current message in the queue
-        int retrialNumber = 0;
-        int flushResponseCode;
-
-        // automatic retries if messages cannot be flushed
-        do {
-            retrialNumber++;
-            DMaaPMRPublisherResponse flushResponse = flush();
-            flushResponseCode = flushResponse.getResponseCode();
-
-            if (!isSuccessfulResponseCode(flushResponseCode)) {
-                LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
-                                "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
-                        AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
-
-                Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
-            }
-        } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
-                !isSuccessfulResponseCode(flushResponseCode));
-
-        if (!isSuccessfulResponseCode(flushResponseCode)) {
-            LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
-        } else {
-            LOG.info("Successfully published all batched messages to publisher.");
-        }
-
-        // close http client
-        closeableHttpClient.close();
-
-    }
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ *  dcae-analytics\r
+ * ================================================================================\r
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ *  Licensed under the Apache License, Version 2.0 (the "License");\r
+ *  you may not use this file except in compliance with the License.\r
+ *   You may obtain a copy of the License at\r
+ *\r
+ *          http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ *  ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;\r
+\r
+import com.google.common.base.Optional;\r
+import com.google.common.collect.Iterables;\r
+import com.google.common.collect.Lists;\r
+import com.google.inject.Inject;\r
+import com.google.inject.assistedinject.Assisted;\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.apache.http.HttpHeaders;\r
+import org.apache.http.client.HttpClient;\r
+import org.apache.http.client.methods.HttpPost;\r
+import org.apache.http.entity.ContentType;\r
+import org.apache.http.entity.StringEntity;\r
+import org.apache.http.impl.client.CloseableHttpClient;\r
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
+import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.net.URI;\r
+import java.util.Date;\r
+import java.util.List;\r
+\r
+import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;\r
+import static java.lang.String.format;\r
+\r
+/**\r
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}\r
+ *\r
+ * @author Rajiv Singla . Creation Date: 10/13/2016.\r
+ */\r
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {\r
+\r
+    private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);\r
+\r
+    private final DMaaPMRPublisherConfig publisherConfig;\r
+    private final CloseableHttpClient closeableHttpClient;\r
+    private final DMaaPMRPublisherQueue publisherQueue;\r
+    private final Date publisherCreationTime;\r
+    private URI publisherUri;\r
+\r
+    @Inject\r
+    public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,\r
+                                DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,\r
+                                CloseableHttpClient closeableHttpClient) {\r
+\r
+        this.publisherConfig = publisherConfig;\r
+        this.publisherQueue = dMaaPMRPublisherQueueFactory.create(\r
+                publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());\r
+        this.closeableHttpClient = closeableHttpClient;\r
+        this.publisherUri = createPublisherURI(publisherConfig);\r
+        this.publisherCreationTime = new Date();\r
+    }\r
+\r
+\r
+    @Override\r
+    public DMaaPMRPublisherResponse publish(List<String> messages)  {\r
+\r
+        final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();\r
+\r
+        // if messages size is less than batch queue size - just queue them for batch publishing\r
+        if (batchQueueRemainingSize > messages.size()) {\r
+            LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",\r
+                    messages.size(), batchQueueRemainingSize);\r
+            final int batchQueueSize = publisherQueue.addBatchMessages(messages);\r
+            return createPublisherAcceptedResponse(batchQueueSize);\r
+\r
+        } else {\r
+\r
+            // grab all already queued messages, append current messages and force publish them to DMaaP MR topic\r
+            final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
+            LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +\r
+                    "Publisher Topic.");\r
+            return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));\r
+        }\r
+\r
+    }\r
+\r
+    @Override\r
+    public DMaaPMRPublisherResponse forcePublish(List<String> messages) {\r
+\r
+        LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());\r
+\r
+        final String contentType = publisherConfig.getContentType();\r
+        final String userName = publisherConfig.getUserName();\r
+        final String userPassword = publisherConfig.getUserPassword();\r
+        final HttpPost postRequest = new HttpPost(publisherUri);\r
+\r
+        // add Authorization Header if username and password are present\r
+        final Optional<String> authHeader = getAuthHeader(userName, userPassword);\r
+        if (authHeader.isPresent()) {\r
+            postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());\r
+        } else {\r
+            LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");\r
+        }\r
+\r
+        // Create post string entity\r
+        final String messagesJson = convertToJsonString(messages);\r
+        final StringEntity requestEntity =\r
+                new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));\r
+        postRequest.setEntity(requestEntity);\r
+\r
+        try {\r
+            final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());\r
+            final Integer responseCode = responsePair.getLeft();\r
+            final String responseBody = responsePair.getRight();\r
+            // if messages were published successfully, return successful response\r
+            if (isSuccessfulResponseCode(responseCode)) {\r
+                LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +\r
+                                "Body: {}, Number of Messages published: {}",\r
+                        responseCode, responseBody, messages.size());\r
+\r
+            } else {\r
+                LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +\r
+                        "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);\r
+                addMessagesToRecoveryQueue(publisherQueue, messages);\r
+            }\r
+\r
+            return createPublisherResponse(responseCode, responseBody,\r
+                    getPendingMessages(publisherQueue, publisherConfig));\r
+\r
+        } catch (IOException e) {\r
+            // If IO Error then we need to also put messages in recovery queue\r
+            addMessagesToRecoveryQueue(publisherQueue, messages);\r
+            final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +\r
+                    "Messages will be queued in recovery queue. Messages Size: %d", messages.size());\r
+\r
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
+        }\r
+\r
+    }\r
+\r
+\r
+    @Override\r
+    public DMaaPMRPublisherResponse flush() {\r
+        final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
+        // If there are no message return 204 (No Content) response code\r
+        if (queueMessages.isEmpty()) {\r
+            LOG.debug("No messages to publish to batch queue. Returning 204 status code");\r
+            return createPublisherNoContentResponse();\r
+        } else {\r
+            // force publish messages in queue\r
+            return forcePublish(queueMessages);\r
+        }\r
+    }\r
+\r
+    @Override\r
+    public Date getPublisherCreationTime() {\r
+        return new Date(publisherCreationTime.getTime());\r
+    }\r
+\r
+    @Override\r
+    public void close() throws Exception {\r
+\r
+        // flush current message in the queue\r
+        int retrialNumber = 0;\r
+        int flushResponseCode;\r
+\r
+        // automatic retries if messages cannot be flushed\r
+        do {\r
+            retrialNumber++;\r
+            DMaaPMRPublisherResponse flushResponse = flush();\r
+            flushResponseCode = flushResponse.getResponseCode();\r
+\r
+            if (!isSuccessfulResponseCode(flushResponseCode)) {\r
+                LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +\r
+                                "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,\r
+                        AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);\r
+\r
+                Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);\r
+            }\r
+        } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&\r
+                !isSuccessfulResponseCode(flushResponseCode));\r
+\r
+        if (!isSuccessfulResponseCode(flushResponseCode)) {\r
+            LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");\r
+        } else {\r
+            LOG.info("Successfully published all batched messages to publisher.");\r
+        }\r
+\r
+        // close http client\r
+        closeableHttpClient.close();\r
+\r
+    }\r
+}\r