2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
\r
23 import com.google.common.base.Optional;
\r
24 import com.google.common.collect.Iterables;
\r
25 import com.google.common.collect.Lists;
\r
26 import com.google.inject.Inject;
\r
27 import com.google.inject.assistedinject.Assisted;
\r
28 import org.apache.commons.lang3.tuple.Pair;
\r
29 import org.apache.http.HttpHeaders;
\r
30 import org.apache.http.client.HttpClient;
\r
31 import org.apache.http.client.methods.HttpPost;
\r
32 import org.apache.http.entity.ContentType;
\r
33 import org.apache.http.entity.StringEntity;
\r
34 import org.apache.http.impl.client.CloseableHttpClient;
\r
35 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
\r
36 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
37 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
\r
38 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
\r
39 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
\r
40 import org.slf4j.Logger;
\r
41 import org.slf4j.LoggerFactory;
\r
43 import java.io.IOException;
\r
44 import java.net.URI;
\r
45 import java.util.Date;
\r
46 import java.util.List;
\r
48 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
\r
49 import static java.lang.String.format;
\r
52 * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
\r
54 * @author Rajiv Singla . Creation Date: 10/13/2016.
\r
56 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
\r
58 private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
\r
60 private final DMaaPMRPublisherConfig publisherConfig;
\r
61 private final CloseableHttpClient closeableHttpClient;
\r
62 private final DMaaPMRPublisherQueue publisherQueue;
\r
63 private final Date publisherCreationTime;
\r
64 private URI publisherUri;
\r
67 public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
\r
68 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
\r
69 CloseableHttpClient closeableHttpClient) {
\r
71 this.publisherConfig = publisherConfig;
\r
72 final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
\r
73 this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
\r
74 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
\r
75 this.closeableHttpClient = closeableHttpClient;
\r
76 this.publisherUri = createPublisherURI(publisherConfig);
\r
77 this.publisherCreationTime = new Date();
\r
82 public DMaaPMRPublisherResponse publish(List<String> messages) {
\r
84 final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
\r
86 // if messages size is less than batch queue size - just queue them for batch publishing
\r
87 if (batchQueueRemainingSize > messages.size()) {
\r
88 LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
\r
89 messages.size(), batchQueueRemainingSize);
\r
90 final int batchQueueSize = publisherQueue.addBatchMessages(messages);
\r
91 return createPublisherAcceptedResponse(batchQueueSize);
\r
95 // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
\r
96 final List<String> queueMessages = publisherQueue.getMessageForPublishing();
\r
97 LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
\r
98 "Publisher Topic.");
\r
99 return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
\r
105 public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
\r
107 LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
\r
109 final String contentType = publisherConfig.getContentType();
\r
110 final String userName = publisherConfig.getUserName();
\r
111 final String userPassword = publisherConfig.getUserPassword();
\r
112 final HttpPost postRequest = new HttpPost(publisherUri);
\r
114 // add Authorization Header if username and password are present
\r
115 final Optional<String> authHeader = getAuthHeader(userName, userPassword);
\r
116 if (authHeader.isPresent()) {
\r
117 postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
\r
119 LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
\r
122 // Create post string entity
\r
123 final String messagesJson = convertToJsonString(messages);
\r
124 final StringEntity requestEntity =
\r
125 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
\r
126 postRequest.setEntity(requestEntity);
\r
129 final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
\r
130 final Integer responseCode = responsePair.getLeft();
\r
131 final String responseBody = responsePair.getRight();
\r
132 // if messages were published successfully, return successful response
\r
133 if (isSuccessfulResponseCode(responseCode)) {
\r
134 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
\r
135 "Body: {}, Number of Messages published: {}",
\r
136 responseCode, responseBody, messages.size());
\r
139 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
\r
140 "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
\r
141 addMessagesToRecoveryQueue(publisherQueue, messages);
\r
144 return createPublisherResponse(responseCode, responseBody,
\r
145 getPendingMessages(publisherQueue, publisherConfig));
\r
147 } catch (IOException e) {
\r
148 // If IO Error then we need to also put messages in recovery queue
\r
149 addMessagesToRecoveryQueue(publisherQueue, messages);
\r
150 final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
\r
151 "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
\r
153 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
160 public DMaaPMRPublisherResponse flush() {
\r
161 final List<String> queueMessages = publisherQueue.getMessageForPublishing();
\r
162 // If there are no message return 204 (No Content) response code
\r
163 if (queueMessages.isEmpty()) {
\r
164 LOG.debug("No messages to publish to batch queue. Returning 204 status code");
\r
165 return createPublisherNoContentResponse();
\r
167 // force publish messages in queue
\r
168 return forcePublish(queueMessages);
\r
173 public Date getPublisherCreationTime() {
\r
174 return new Date(publisherCreationTime.getTime());
\r
178 public void close() throws Exception {
\r
180 // flush current message in the queue
\r
181 int retrialNumber = 0;
\r
182 int flushResponseCode;
\r
184 // automatic retries if messages cannot be flushed
\r
187 DMaaPMRPublisherResponse flushResponse = flush();
\r
188 flushResponseCode = flushResponse.getResponseCode();
\r
190 if (!isSuccessfulResponseCode(flushResponseCode)) {
\r
191 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
\r
192 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
\r
193 AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
\r
195 Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
\r
197 } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
\r
198 !isSuccessfulResponseCode(flushResponseCode));
\r
200 if (!isSuccessfulResponseCode(flushResponseCode)) {
\r
201 LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
\r
203 LOG.info("Successfully published all batched messages to publisher.");
\r
206 // close http client
\r
207 closeableHttpClient.close();
\r