2  * ===============================LICENSE_START======================================
 
   4  * ================================================================================
 
   5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *  you may not use this file except in compliance with the License.
 
   9  *   You may obtain a copy of the License at
 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
 
  13  *  Unless required by applicable law or agreed to in writing, software
 
  14  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  *  See the License for the specific language governing permissions and
 
  17  *  limitations under the License.
 
  18  *  ============================LICENSE_END===========================================
 
  21 package org.onap.universalvesadapter.dmaap.MRPublisher;
 
  23 import static java.lang.String.format;
 
  26 import java.io.IOException;
 
  28 import java.util.Date;
 
  29 import java.util.List;
 
  31 import org.apache.commons.lang3.tuple.Pair;
 
  32 import org.apache.http.HttpHeaders;
 
  33 import org.apache.http.client.HttpClient;
 
  34 import org.apache.http.client.methods.HttpPost;
 
  35 import org.apache.http.entity.ContentType;
 
  36 import org.apache.http.entity.StringEntity;
 
  37 import org.apache.http.impl.client.CloseableHttpClient;
 
  38 import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
 
  39 import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
 
  40 import org.onap.universalvesadapter.exception.DMaapException;
 
  41 import org.onap.universalvesadapter.utils.HTTPUtils;
 
  42 import org.slf4j.Logger;
 
  43 import org.slf4j.LoggerFactory;
 
  44 import org.springframework.beans.factory.annotation.Value;
 
  46 import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
 
  47 import com.google.common.base.Optional;
 
  48 import com.google.common.collect.Iterables;
 
  49 import com.google.common.collect.Lists;
 
  50 import com.google.inject.Inject;
 
  51 import com.google.inject.assistedinject.Assisted;
 
  56  * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
 
  58  * @author Rajiv Singla . Creation Date: 10/13/2016.
 
  61 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
 
  63         @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
 
  64     private int publisherMaxFlushRetries;
 
  67     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
 
  70     private final DMaaPMRPublisherConfig publisherConfig;
 
  71     private final CloseableHttpClient closeableHttpClient;
 
  72     private final DMaaPMRPublisherQueue publisherQueue;
 
  73     private final Date publisherCreationTime;
 
  74     private URI publisherUri;
 
  77     public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
 
  78                                 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
 
  79                                 CloseableHttpClient closeableHttpClient){
 
  81         this.publisherConfig = publisherConfig;
 
  82         final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
 
  83         this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
 
  84                 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
 
  85         this.closeableHttpClient = closeableHttpClient;
 
  86         this.publisherUri = createPublisherURI(publisherConfig);
 
  87         this.publisherCreationTime = new Date();
 
  92     public DMaaPMRPublisherResponse publish(List<String> messages)  {
 
  94         final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
 
  96         // if messages size is less than batch queue size - just queue them for batch publishing
 
  97         if (batchQueueRemainingSize > messages.size()) {
 
  98             LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
 
  99                     messages.size(), batchQueueRemainingSize);
 
 100             final int batchQueueSize = publisherQueue.addBatchMessages(messages);
 
 101             return createPublisherAcceptedResponse(batchQueueSize);
 
 105             // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
 
 106             final List<String> queueMessages = publisherQueue.getMessageForPublishing();
 
 107             LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
 
 109             return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
 
 115     public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
 
 117         LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
 
 119         final String contentType = publisherConfig.getContentType();
 
 120         final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName();
 
 121         final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword();
 
 122         final HttpPost postRequest = new HttpPost(publisherUri);
 
 124         // add Authorization Header if username and password are present
 
 125         final Optional<String> authHeader = getAuthHeader(userName, userPassword);
 
 126         if (authHeader.isPresent()) {
 
 127             postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
 
 129             LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
 
 132         // Create post string entity
 
 133         final String messagesJson = convertToJsonString(messages);
 
 134         final StringEntity requestEntity =
 
 135                 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
 
 136         postRequest.setEntity(requestEntity);
 
 139             final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
 
 140             final Integer responseCode = responsePair.getLeft();
 
 141             final String responseBody = responsePair.getRight();
 
 142             // if messages were published successfully, return successful response
 
 143             if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
 
 144                 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
 
 145                                 "Body: {}, Number of Messages published: {}",
 
 146                         responseCode, responseBody, messages.size());
 
 149                 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
 
 150                         "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
 
 151                 addMessagesToRecoveryQueue(publisherQueue, messages);
 
 154             return createPublisherResponse(responseCode, responseBody,
 
 155                     getPendingMessages(publisherQueue, publisherConfig));
 
 157         } catch (IOException e) {
 
 158             // If IO Error then we need to also put messages in recovery queue
 
 159             addMessagesToRecoveryQueue(publisherQueue, messages);
 
 160             final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
 
 161                     "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
 
 163             throw new DMaapException(errorMessage, LOG, e);
 
 170     public DMaaPMRPublisherResponse flush() {
 
 171         final List<String> queueMessages = publisherQueue.getMessageForPublishing();
 
 172         // If there are no message return 204 (No Content) response code
 
 173         if (queueMessages.isEmpty()) {
 
 174             LOG.debug("No messages to publish to batch queue. Returning 204 status code");
 
 175             return createPublisherNoContentResponse();
 
 177             // force publish messages in queue
 
 178             return forcePublish(queueMessages);
 
 183     public Date getPublisherCreationTime() {
 
 184         return new Date(publisherCreationTime.getTime());
 
 188     public void close() throws Exception {
 
 190         // flush current message in the queue
 
 191         int retrialNumber = 0;
 
 192         int flushResponseCode;
 
 194         // automatic retries if messages cannot be flushed
 
 197             DMaaPMRPublisherResponse flushResponse = flush();
 
 198             flushResponseCode = flushResponse.getResponseCode();
 
 200             if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
 
 201                 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
 
 202                                 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
 
 203                                 publisherMaxFlushRetries);
 
 205                 Thread.sleep(publisherMaxFlushRetries);
 
 207         } while (retrialNumber <= publisherMaxFlushRetries &&
 
 208                 !HTTPUtils.isSuccessfulResponseCode(flushResponseCode));
 
 210         if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
 
 211             LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
 
 213             LOG.info("Successfully published all batched messages to publisher.");
 
 217         closeableHttpClient.close();