2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.universalvesadapter.dmaap.MRPublisher;
23 import static java.lang.String.format;
26 import java.io.IOException;
28 import java.util.Date;
29 import java.util.List;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.apache.http.HttpHeaders;
33 import org.apache.http.client.HttpClient;
34 import org.apache.http.client.methods.HttpPost;
35 import org.apache.http.entity.ContentType;
36 import org.apache.http.entity.StringEntity;
37 import org.apache.http.impl.client.CloseableHttpClient;
38 import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
39 import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
40 import org.onap.universalvesadapter.exception.DMaapException;
41 import org.onap.universalvesadapter.utils.HTTPUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Value;
46 import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
47 import com.google.common.base.Optional;
48 import com.google.common.collect.Iterables;
49 import com.google.common.collect.Lists;
50 import com.google.inject.Inject;
51 import com.google.inject.assistedinject.Assisted;
56 * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
58 * @author Rajiv Singla . Creation Date: 10/13/2016.
61 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
63 @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
64 private int publisherMaxFlushRetries;
67 private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
70 private final DMaaPMRPublisherConfig publisherConfig;
71 private final CloseableHttpClient closeableHttpClient;
72 private final DMaaPMRPublisherQueue publisherQueue;
73 private final Date publisherCreationTime;
74 private URI publisherUri;
77 public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
78 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
79 CloseableHttpClient closeableHttpClient){
81 this.publisherConfig = publisherConfig;
82 final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
83 this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
84 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
85 this.closeableHttpClient = closeableHttpClient;
86 this.publisherUri = createPublisherURI(publisherConfig);
87 this.publisherCreationTime = new Date();
92 public DMaaPMRPublisherResponse publish(List<String> messages) {
94 final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
96 // if messages size is less than batch queue size - just queue them for batch publishing
97 if (batchQueueRemainingSize > messages.size()) {
98 LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
99 messages.size(), batchQueueRemainingSize);
100 final int batchQueueSize = publisherQueue.addBatchMessages(messages);
101 return createPublisherAcceptedResponse(batchQueueSize);
105 // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
106 final List<String> queueMessages = publisherQueue.getMessageForPublishing();
107 LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
109 return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
115 public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
117 LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
119 final String contentType = publisherConfig.getContentType();
120 final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName();
121 final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword();
122 final HttpPost postRequest = new HttpPost(publisherUri);
124 // add Authorization Header if username and password are present
125 final Optional<String> authHeader = getAuthHeader(userName, userPassword);
126 if (authHeader.isPresent()) {
127 postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
129 LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
132 // Create post string entity
133 final String messagesJson = convertToJsonString(messages);
134 final StringEntity requestEntity =
135 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
136 postRequest.setEntity(requestEntity);
139 final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
140 final Integer responseCode = responsePair.getLeft();
141 final String responseBody = responsePair.getRight();
142 // if messages were published successfully, return successful response
143 if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
144 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
145 "Body: {}, Number of Messages published: {}",
146 responseCode, responseBody, messages.size());
149 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
150 "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
151 addMessagesToRecoveryQueue(publisherQueue, messages);
154 return createPublisherResponse(responseCode, responseBody,
155 getPendingMessages(publisherQueue, publisherConfig));
157 } catch (IOException e) {
158 // If IO Error then we need to also put messages in recovery queue
159 addMessagesToRecoveryQueue(publisherQueue, messages);
160 final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
161 "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
163 throw new DMaapException(errorMessage, LOG, e);
170 public DMaaPMRPublisherResponse flush() {
171 final List<String> queueMessages = publisherQueue.getMessageForPublishing();
172 // If there are no message return 204 (No Content) response code
173 if (queueMessages.isEmpty()) {
174 LOG.debug("No messages to publish to batch queue. Returning 204 status code");
175 return createPublisherNoContentResponse();
177 // force publish messages in queue
178 return forcePublish(queueMessages);
183 public Date getPublisherCreationTime() {
184 return new Date(publisherCreationTime.getTime());
188 public void close() throws Exception {
190 // flush current message in the queue
191 int retrialNumber = 0;
192 int flushResponseCode;
194 // automatic retries if messages cannot be flushed
197 DMaaPMRPublisherResponse flushResponse = flush();
198 flushResponseCode = flushResponse.getResponseCode();
200 if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
201 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
202 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
203 publisherMaxFlushRetries);
205 Thread.sleep(publisherMaxFlushRetries);
207 } while (retrialNumber <= publisherMaxFlushRetries &&
208 !HTTPUtils.isSuccessfulResponseCode(flushResponseCode));
210 if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
211 LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
213 LOG.info("Successfully published all batched messages to publisher.");
217 closeableHttpClient.close();