TCA:Updated eventName logging messages
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / publisher / DMaaPMRPublisherImpl.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;\r
22 \r
23 import com.google.common.base.Optional;\r
24 import com.google.common.collect.Iterables;\r
25 import com.google.common.collect.Lists;\r
26 import com.google.inject.Inject;\r
27 import com.google.inject.assistedinject.Assisted;\r
28 import org.apache.commons.lang3.tuple.Pair;\r
29 import org.apache.http.HttpHeaders;\r
30 import org.apache.http.client.HttpClient;\r
31 import org.apache.http.client.methods.HttpPost;\r
32 import org.apache.http.entity.ContentType;\r
33 import org.apache.http.entity.StringEntity;\r
34 import org.apache.http.impl.client.CloseableHttpClient;\r
35 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
36 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
37 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
38 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
39 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;\r
40 import org.slf4j.Logger;\r
41 import org.slf4j.LoggerFactory;\r
42 \r
43 import java.io.IOException;\r
44 import java.net.URI;\r
45 import java.util.Date;\r
46 import java.util.List;\r
47 \r
48 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;\r
49 import static java.lang.String.format;\r
50 \r
51 /**\r
52  * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}\r
53  *\r
54  * @author Rajiv Singla . Creation Date: 10/13/2016.\r
55  */\r
56 public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {\r
57 \r
58     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);\r
59 \r
60     private final DMaaPMRPublisherConfig publisherConfig;\r
61     private final CloseableHttpClient closeableHttpClient;\r
62     private final DMaaPMRPublisherQueue publisherQueue;\r
63     private final Date publisherCreationTime;\r
64     private URI publisherUri;\r
65 \r
66     @Inject\r
67     public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,\r
68                                 DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,\r
69                                 CloseableHttpClient closeableHttpClient) {\r
70 \r
71         this.publisherConfig = publisherConfig;\r
72         final int maxBatchSize = publisherConfig.getMaxBatchSize() > 0 ? publisherConfig.getMaxBatchSize() : 1;\r
73         this.publisherQueue = dMaaPMRPublisherQueueFactory.create(\r
74                 maxBatchSize, publisherConfig.getMaxRecoveryQueueSize());\r
75         this.closeableHttpClient = closeableHttpClient;\r
76         this.publisherUri = createPublisherURI(publisherConfig);\r
77         this.publisherCreationTime = new Date();\r
78     }\r
79 \r
80 \r
81     @Override\r
82     public DMaaPMRPublisherResponse publish(List<String> messages)  {\r
83 \r
84         final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();\r
85 \r
86         // if messages size is less than batch queue size - just queue them for batch publishing\r
87         if (batchQueueRemainingSize > messages.size()) {\r
88             LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",\r
89                     messages.size(), batchQueueRemainingSize);\r
90             final int batchQueueSize = publisherQueue.addBatchMessages(messages);\r
91             return createPublisherAcceptedResponse(batchQueueSize);\r
92 \r
93         } else {\r
94 \r
95             // grab all already queued messages, append current messages and force publish them to DMaaP MR topic\r
96             final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
97             LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +\r
98                     "Publisher Topic.");\r
99             return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));\r
100         }\r
101 \r
102     }\r
103 \r
104     @Override\r
105     public DMaaPMRPublisherResponse forcePublish(List<String> messages) {\r
106 \r
107         LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());\r
108 \r
109         final String contentType = publisherConfig.getContentType();\r
110         final String userName = publisherConfig.getUserName();\r
111         final String userPassword = publisherConfig.getUserPassword();\r
112         final HttpPost postRequest = new HttpPost(publisherUri);\r
113 \r
114         // add Authorization Header if username and password are present\r
115         final Optional<String> authHeader = getAuthHeader(userName, userPassword);\r
116         if (authHeader.isPresent()) {\r
117             postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());\r
118         } else {\r
119             LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");\r
120         }\r
121 \r
122         // Create post string entity\r
123         final String messagesJson = convertToJsonString(messages);\r
124         final StringEntity requestEntity =\r
125                 new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));\r
126         postRequest.setEntity(requestEntity);\r
127 \r
128         try {\r
129             final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());\r
130             final Integer responseCode = responsePair.getLeft();\r
131             final String responseBody = responsePair.getRight();\r
132             // if messages were published successfully, return successful response\r
133             if (isSuccessfulResponseCode(responseCode)) {\r
134                 LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +\r
135                                 "Body: {}, Number of Messages published: {}",\r
136                         responseCode, responseBody, messages.size());\r
137 \r
138             } else {\r
139                 LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +\r
140                         "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);\r
141                 addMessagesToRecoveryQueue(publisherQueue, messages);\r
142             }\r
143 \r
144             return createPublisherResponse(responseCode, responseBody,\r
145                     getPendingMessages(publisherQueue, publisherConfig));\r
146 \r
147         } catch (IOException e) {\r
148             // If IO Error then we need to also put messages in recovery queue\r
149             addMessagesToRecoveryQueue(publisherQueue, messages);\r
150             final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +\r
151                     "Messages will be queued in recovery queue. Messages Size: %d", messages.size());\r
152 \r
153             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
154         }\r
155 \r
156     }\r
157 \r
158 \r
159     @Override\r
160     public DMaaPMRPublisherResponse flush() {\r
161         final List<String> queueMessages = publisherQueue.getMessageForPublishing();\r
162         // If there are no message return 204 (No Content) response code\r
163         if (queueMessages.isEmpty()) {\r
164             LOG.debug("No messages to publish to batch queue. Returning 204 status code");\r
165             return createPublisherNoContentResponse();\r
166         } else {\r
167             // force publish messages in queue\r
168             return forcePublish(queueMessages);\r
169         }\r
170     }\r
171 \r
172     @Override\r
173     public Date getPublisherCreationTime() {\r
174         return new Date(publisherCreationTime.getTime());\r
175     }\r
176 \r
177     @Override\r
178     public void close() throws Exception {\r
179 \r
180         // flush current message in the queue\r
181         int retrialNumber = 0;\r
182         int flushResponseCode;\r
183 \r
184         // automatic retries if messages cannot be flushed\r
185         do {\r
186             retrialNumber++;\r
187             DMaaPMRPublisherResponse flushResponse = flush();\r
188             flushResponseCode = flushResponse.getResponseCode();\r
189 \r
190             if (!isSuccessfulResponseCode(flushResponseCode)) {\r
191                 LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +\r
192                                 "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,\r
193                         AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);\r
194 \r
195                 Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);\r
196             }\r
197         } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&\r
198                 !isSuccessfulResponseCode(flushResponseCode));\r
199 \r
200         if (!isSuccessfulResponseCode(flushResponseCode)) {\r
201             LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");\r
202         } else {\r
203             LOG.info("Successfully published all batched messages to publisher.");\r
204         }\r
205 \r
206         // close http client\r
207         closeableHttpClient.close();\r
208 \r
209     }\r
210 }\r