5930f417c46999694c6240315c806a8fe8c17988
[dcaegen2/services/mapper.git] /
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.universalvesadapter.dmaap.MRPublisher;
22
23 import static java.lang.String.format;
24
25
26 import java.io.IOException;
27 import java.net.URI;
28 import java.util.Date;
29 import java.util.List;
30
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.apache.http.HttpHeaders;
33 import org.apache.http.client.HttpClient;
34 import org.apache.http.client.methods.HttpPost;
35 import org.apache.http.entity.ContentType;
36 import org.apache.http.entity.StringEntity;
37 import org.apache.http.impl.client.CloseableHttpClient;
38 import org.onap.universalvesadapter.configs.DMaaPMRPublisherConfig;
39 import org.onap.universalvesadapter.dmaap.BaseDMaaPMRComponent;
40 import org.onap.universalvesadapter.exception.DMaapException;
41 import org.onap.universalvesadapter.utils.HTTPUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Value;
45
46 import com.att.aft.dme2.internal.springframework.context.annotation.ComponentScan;
47 import com.google.common.base.Optional;
48 import com.google.common.collect.Iterables;
49 import com.google.common.collect.Lists;
50 import com.google.inject.Inject;
51 import com.google.inject.assistedinject.Assisted;
52
53
54
55 /**
56  * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
57  *
58  * @author Rajiv Singla . Creation Date: 10/13/2016.
59  */
60 @ComponentScan
61 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
62         
63         @Value("${mr.publisher.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE}")
64     private int publisherMaxFlushRetries;
65   
66
67     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
68
69     public int a =2;
70     private final DMaaPMRPublisherConfig publisherConfig;
71     private final CloseableHttpClient closeableHttpClient;
72     private final DMaaPMRPublisherQueue publisherQueue;
73     private final Date publisherCreationTime;
74     private URI publisherUri;
75
76     @Inject
77     public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
78                                 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
79                                 CloseableHttpClient closeableHttpClient){
80
81         this.publisherConfig = publisherConfig;
82         final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;
83         this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
84                 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());
85         this.closeableHttpClient = closeableHttpClient;
86         this.publisherUri = createPublisherURI(publisherConfig);
87         this.publisherCreationTime = new Date();
88     }
89
90
91     @Override
92     public DMaaPMRPublisherResponse publish(List<String> messages)  {
93
94         final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
95
96         // if messages size is less than batch queue size - just queue them for batch publishing
97         if (batchQueueRemainingSize > messages.size()) {
98             LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
99                     messages.size(), batchQueueRemainingSize);
100             final int batchQueueSize = publisherQueue.addBatchMessages(messages);
101             return createPublisherAcceptedResponse(batchQueueSize);
102
103         } else {
104
105             // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
106             final List<String> queueMessages = publisherQueue.getMessageForPublishing();
107             LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
108                     "Publisher Topic.");
109             return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
110         }
111
112     }
113
114     @Override
115     public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
116
117         LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
118
119         final String contentType = publisherConfig.getContentType();
120         final String userName =(publisherConfig.getUserName().equals("null")) ? null : publisherConfig.getUserName();
121         final String userPassword = (publisherConfig.getUserPassword().equals("null")) ? null : publisherConfig.getUserPassword();
122         final HttpPost postRequest = new HttpPost(publisherUri);
123        
124         // add Authorization Header if username and password are present
125         final Optional<String> authHeader = getAuthHeader(userName, userPassword);
126         if (authHeader.isPresent()) {
127             postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
128         } else {
129             LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
130         }
131
132         // Create post string entity
133         final String messagesJson = convertToJsonString(messages);
134         final StringEntity requestEntity =
135                 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
136         postRequest.setEntity(requestEntity);
137
138         try {
139             final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
140             final Integer responseCode = responsePair.getLeft();
141             final String responseBody = responsePair.getRight();
142             // if messages were published successfully, return successful response
143             if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
144                 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
145                                 "Body: {}, Number of Messages published: {}",
146                         responseCode, responseBody, messages.size());
147
148             } else {
149                 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
150                         "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
151                 addMessagesToRecoveryQueue(publisherQueue, messages);
152             }
153
154             return createPublisherResponse(responseCode, responseBody,
155                     getPendingMessages(publisherQueue, publisherConfig));
156
157         } catch (IOException e) {
158             // If IO Error then we need to also put messages in recovery queue
159             addMessagesToRecoveryQueue(publisherQueue, messages);
160             final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
161                     "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
162
163             throw new DMaapException(errorMessage, LOG, e);
164         }
165
166     }
167
168
169     @Override
170     public DMaaPMRPublisherResponse flush() {
171         final List<String> queueMessages = publisherQueue.getMessageForPublishing();
172         // If there are no message return 204 (No Content) response code
173         if (queueMessages.isEmpty()) {
174             LOG.debug("No messages to publish to batch queue. Returning 204 status code");
175             return createPublisherNoContentResponse();
176         } else {
177             // force publish messages in queue
178             return forcePublish(queueMessages);
179         }
180     }
181
182     @Override
183     public Date getPublisherCreationTime() {
184         return new Date(publisherCreationTime.getTime());
185     }
186
187     @Override
188     public void close() throws Exception {
189
190         // flush current message in the queue
191         int retrialNumber = 0;
192         int flushResponseCode;
193
194         // automatic retries if messages cannot be flushed
195         do {
196             retrialNumber++;
197             DMaaPMRPublisherResponse flushResponse = flush();
198             flushResponseCode = flushResponse.getResponseCode();
199
200             if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
201                 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
202                                 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
203                                 publisherMaxFlushRetries);
204
205                 Thread.sleep(publisherMaxFlushRetries);
206             }
207         } while (retrialNumber <= publisherMaxFlushRetries &&
208                 !HTTPUtils.isSuccessfulResponseCode(flushResponseCode));
209
210         if (!HTTPUtils.isSuccessfulResponseCode(flushResponseCode)) {
211             LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
212         } else {
213             LOG.info("Successfully published all batched messages to publisher.");
214         }
215
216         // close http client
217         closeableHttpClient.close();
218
219     }
220 }