Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / onap / dcae / apod / analytics / dmaap / service / publisher / DMaaPMRPublisherImpl.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.dmaap.service.publisher;
22
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Iterables;
25 import com.google.common.collect.Lists;
26 import com.google.inject.Inject;
27 import com.google.inject.assistedinject.Assisted;
28 import org.apache.commons.lang3.tuple.Pair;
29 import org.apache.http.HttpHeaders;
30 import org.apache.http.client.HttpClient;
31 import org.apache.http.client.methods.HttpPost;
32 import org.apache.http.entity.ContentType;
33 import org.apache.http.entity.StringEntity;
34 import org.apache.http.impl.client.CloseableHttpClient;
35 import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
36 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
37 import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
38 import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
39 import org.onap.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.io.IOException;
44 import java.net.URI;
45 import java.util.Date;
46 import java.util.List;
47
48 import static org.onap.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
49 import static java.lang.String.format;
50
51 /**
52  * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
53  *
54  * @author Rajiv Singla . Creation Date: 10/13/2016.
55  */
56 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
57
58     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
59
60     private final DMaaPMRPublisherConfig publisherConfig;
61     private final CloseableHttpClient closeableHttpClient;
62     private final DMaaPMRPublisherQueue publisherQueue;
63     private final Date publisherCreationTime;
64     private URI publisherUri;
65
66     @Inject
67     public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
68                                 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
69                                 CloseableHttpClient closeableHttpClient) {
70
71         this.publisherConfig = publisherConfig;
72         final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
73         this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
74                 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
75         this.closeableHttpClient = closeableHttpClient;
76         this.publisherUri = createPublisherURI(publisherConfig);
77         this.publisherCreationTime = new Date();
78     }
79
80
81     @Override
82     public DMaaPMRPublisherResponse publish(List<String> messages)  {
83
84         final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
85
86         // if messages size is less than batch queue size - just queue them for batch publishing
87         if (batchQueueRemainingSize > messages.size()) {
88             LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
89                     messages.size(), batchQueueRemainingSize);
90             final int batchQueueSize = publisherQueue.addBatchMessages(messages);
91             return createPublisherAcceptedResponse(batchQueueSize);
92
93         } else {
94
95             // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
96             final List<String> queueMessages = publisherQueue.getMessageForPublishing();
97             LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
98                     "Publisher Topic.");
99             return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
100         }
101
102     }
103
104     @Override
105     public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
106
107         LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
108
109         final String contentType = publisherConfig.getContentType();
110         final String userName = publisherConfig.getUserName();
111         final String userPassword = publisherConfig.getUserPassword();
112         final HttpPost postRequest = new HttpPost(publisherUri);
113
114         // add Authorization Header if username and password are present
115         final Optional<String> authHeader = getAuthHeader(userName, userPassword);
116         if (authHeader.isPresent()) {
117             postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
118         } else {
119             LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
120         }
121
122         // Create post string entity
123         final String messagesJson = convertToJsonString(messages);
124         final StringEntity requestEntity =
125                 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
126         postRequest.setEntity(requestEntity);
127
128         try {
129             final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
130             final Integer responseCode = responsePair.getLeft();
131             final String responseBody = responsePair.getRight();
132             // if messages were published successfully, return successful response
133             if (isSuccessfulResponseCode(responseCode)) {
134                 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
135                                 "Body: {}, Number of Messages published: {}",
136                         responseCode, responseBody, messages.size());
137
138             } else {
139                 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
140                         "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
141                 addMessagesToRecoveryQueue(publisherQueue, messages);
142             }
143
144             return createPublisherResponse(responseCode, responseBody,
145                     getPendingMessages(publisherQueue, publisherConfig));
146
147         } catch (IOException e) {
148             // If IO Error then we need to also put messages in recovery queue
149             addMessagesToRecoveryQueue(publisherQueue, messages);
150             final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
151                     "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
152
153             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
154         }
155
156     }
157
158
159     @Override
160     public DMaaPMRPublisherResponse flush() {
161         final List<String> queueMessages = publisherQueue.getMessageForPublishing();
162         // If there are no message return 204 (No Content) response code
163         if (queueMessages.isEmpty()) {
164             LOG.debug("No messages to publish to batch queue. Returning 204 status code");
165             return createPublisherNoContentResponse();
166         } else {
167             // force publish messages in queue
168             return forcePublish(queueMessages);
169         }
170     }
171
172     @Override
173     public Date getPublisherCreationTime() {
174         return new Date(publisherCreationTime.getTime());
175     }
176
177     @Override
178     public void close() throws Exception {
179
180         // flush current message in the queue
181         int retrialNumber = 0;
182         int flushResponseCode;
183
184         // automatic retries if messages cannot be flushed
185         do {
186             retrialNumber++;
187             DMaaPMRPublisherResponse flushResponse = flush();
188             flushResponseCode = flushResponse.getResponseCode();
189
190             if (!isSuccessfulResponseCode(flushResponseCode)) {
191                 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
192                                 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
193                         AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
194
195                 Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
196             }
197         } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
198                 !isSuccessfulResponseCode(flushResponseCode));
199
200         if (!isSuccessfulResponseCode(flushResponseCode)) {
201             LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
202         } else {
203             LOG.info("Successfully published all batched messages to publisher.");
204         }
205
206         // close http client
207         closeableHttpClient.close();
208
209     }
210 }