Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / BaseDMaaPMRComponent.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.dmaap.service;\r
22 \r
23 import com.fasterxml.jackson.core.JsonProcessingException;\r
24 import com.fasterxml.jackson.databind.JsonNode;\r
25 import com.fasterxml.jackson.databind.ObjectMapper;\r
26 import com.google.common.base.Optional;\r
27 import org.apache.commons.codec.binary.Base64;\r
28 import org.apache.commons.lang3.StringEscapeUtils;\r
29 import org.apache.commons.lang3.tuple.ImmutablePair;\r
30 import org.apache.commons.lang3.tuple.Pair;\r
31 import org.apache.http.HttpEntity;\r
32 import org.apache.http.HttpResponse;\r
33 import org.apache.http.client.ResponseHandler;\r
34 import org.apache.http.client.utils.URIBuilder;\r
35 import org.apache.http.util.EntityUtils;\r
36 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
37 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
38 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;\r
39 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;\r
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
41 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;\r
42 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;\r
43 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
44 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;\r
45 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;\r
46 import org.slf4j.Logger;\r
47 import org.slf4j.LoggerFactory;\r
48 \r
49 import java.io.IOException;\r
50 import java.net.URI;\r
51 import java.net.URISyntaxException;\r
52 import java.nio.charset.Charset;\r
53 import java.util.LinkedList;\r
54 import java.util.List;\r
55 \r
56 import javax.annotation.Nonnull;\r
57 import javax.annotation.Nullable;\r
58 \r
59 import static java.lang.String.format;\r
60 \r
61 /**\r
62  * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods\r
63  *\r
64  * @author Rajiv Singla . Creation Date: 11/1/2016.\r
65  */\r
66 public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {\r
67 \r
68     private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);\r
69 \r
70     private static final ObjectMapper objectMapper = new ObjectMapper();\r
71 \r
72     /**\r
73      * Creates Base64 encoded Auth Header for given userName and Password\r
74      * If either user name of password are null return absent\r
75      *\r
76      * @param userName username\r
77      * @param userPassword user password\r
78      * @return base64 encoded auth header if username or password are both non null\r
79      */\r
80     protected static Optional<String> getAuthHeader(@Nullable final String userName,\r
81                                                     @Nullable final String userPassword) {\r
82         if (userName == null || userPassword == null) {\r
83             return Optional.absent();\r
84         } else {\r
85             final String auth = userName + ":" + userPassword;\r
86             final Charset isoCharset = Charset.forName("ISO-8859-1");\r
87             byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));\r
88             return Optional.of("Basic " + new String(encodedAuth, isoCharset));\r
89         }\r
90     }\r
91 \r
92 \r
93     /**\r
94      * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}\r
95      *\r
96      * @param publisherConfig publisher settings\r
97      *\r
98      * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic\r
99      */\r
100     protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {\r
101         final String hostName = publisherConfig.getHostName();\r
102         final Integer portNumber = publisherConfig.getPortNumber();\r
103         final String getProtocol = publisherConfig.getProtocol();\r
104         final String topicName = publisherConfig.getTopicName();\r
105         URI publisherURI = null;\r
106         try {\r
107             publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
108                     .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();\r
109         } catch (URISyntaxException e) {\r
110             final String errorMessage = format("Error while creating publisher URI: %s", e);\r
111             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
112         }\r
113         LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);\r
114         return publisherURI;\r
115     }\r
116 \r
117 \r
118     /**\r
119      * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}\r
120      *\r
121      * @param subscriberConfig subscriber settings\r
122      *\r
123      * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic\r
124      */\r
125     protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {\r
126         final String hostName = subscriberConfig.getHostName();\r
127         final Integer portNumber = subscriberConfig.getPortNumber();\r
128         final String getProtocol = subscriberConfig.getProtocol();\r
129         final String topicName = subscriberConfig.getTopicName();\r
130         final String consumerId = subscriberConfig.getConsumerId();\r
131         final String consumerGroup = subscriberConfig.getConsumerGroup();\r
132         final Integer timeoutMS = subscriberConfig.getTimeoutMS();\r
133         final Integer messageLimit = subscriberConfig.getMessageLimit();\r
134         URI subscriberURI = null;\r
135         try {\r
136             URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)\r
137                     .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX\r
138                             + topicName + "/"\r
139                             + consumerGroup + "/" +\r
140                             consumerId);\r
141             // add query params if present\r
142             if (timeoutMS > 0) {\r
143                 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());\r
144             }\r
145             if (messageLimit > 0) {\r
146                 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,\r
147                         messageLimit.toString());\r
148             }\r
149             subscriberURI = uriBuilder.build();\r
150 \r
151         } catch (URISyntaxException e) {\r
152             final String errorMessage = format("Error while creating subscriber URI: %s", e);\r
153             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
154         }\r
155 \r
156         LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);\r
157         return subscriberURI;\r
158     }\r
159 \r
160 \r
161     /**\r
162      *  Creates 202 (Accepted) Response code message\r
163      *\r
164      * @param batchQueueSize batch Queue size\r
165      *\r
166      * @return response with 202 message code\r
167      */\r
168     protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {\r
169         return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,\r
170                 "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);\r
171     }\r
172 \r
173 \r
174     /**\r
175      *  Creates 204 (No Content) Response code message\r
176      *\r
177      * @return response with 204 message code\r
178      */\r
179     protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {\r
180         return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,\r
181                 "No Content - No Messages in batch queue for flushing to MR Topic", 0);\r
182     }\r
183 \r
184 \r
185     /**\r
186      * Creates Publisher Response for given response code, response Message and pending Message Count\r
187      *\r
188      * @param responseCode HTTP Status Code\r
189      * @param responseMessage response message\r
190      * @param pendingMessages pending messages in batch queue\r
191      *\r
192      * @return DMaaP MR Publisher Response\r
193      */\r
194     protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String\r
195             responseMessage, int pendingMessages) {\r
196         return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);\r
197     }\r
198 \r
199 \r
200     /**\r
201      * Returns weekly consistent pending messages in batch queue\r
202      *\r
203      * @param publisherQueue batch queue\r
204      * @param publisherConfig publisher settings\r
205      *\r
206      * @return pending messages to be published\r
207      */\r
208     protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,\r
209                                             @Nonnull final DMaaPMRPublisherConfig publisherConfig) {\r
210         return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();\r
211     }\r
212 \r
213 \r
214     /**\r
215      * Creates Subscriber Response for give response Code, response Message and fetch messages\r
216      *\r
217      * @param responseCode response Code\r
218      * @param responseMessage response Message\r
219      * @param fetchedMessages fetched messages\r
220      *\r
221      * @return DMaaP MR Subscriber Response\r
222      */\r
223     protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String\r
224             responseMessage, List<String> fetchedMessages) {\r
225         if (fetchedMessages == null) {\r
226             return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);\r
227         } else {\r
228             return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);\r
229         }\r
230     }\r
231 \r
232 \r
233     /**\r
234      * Custom response handler which extract status code and response body\r
235      *\r
236      * @return Pair containing Response code and response body\r
237      */\r
238     protected static ResponseHandler<Pair<Integer, String>> responseHandler() {\r
239         return new ResponseHandler<Pair<Integer, String>>() {\r
240             @Override\r
241             public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {\r
242                 // Get Response status code\r
243                 final int status = response.getStatusLine().getStatusCode();\r
244                 final HttpEntity responseEntity = response.getEntity();\r
245                 // If response entity is not null - extract response body as string\r
246                 String responseEntityString = "";\r
247                 if (responseEntity != null) {\r
248                     responseEntityString = EntityUtils.toString(responseEntity);\r
249                 }\r
250                 return new ImmutablePair<>(status, responseEntityString);\r
251             }\r
252         };\r
253     }\r
254 \r
255 \r
256     /**\r
257      *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will\r
258      *  be lost\r
259      *\r
260      * @param publisherQueue publisher queue\r
261      * @param messages recoverable messages to be published to recovery queue\r
262      */\r
263     protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,\r
264                                                      List<String> messages) {\r
265         try {\r
266             publisherQueue.addRecoverableMessages(messages);\r
267 \r
268             LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",\r
269                     messages.size(), publisherQueue.getBatchQueueRemainingSize());\r
270 \r
271         } catch (IllegalStateException e) {\r
272             final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +\r
273                             "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",\r
274                     messages.size(), publisherQueue.getRecoveryQueueRemainingSize());\r
275             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
276         }\r
277     }\r
278 \r
279 \r
280     /**\r
281      * Converts List of messages to Json String Array which can be published to DMaaP MR topic.\r
282      *\r
283      * @param messages messages that need to parsed to Json Array representation\r
284      * @return json string representation of message\r
285      */\r
286     protected static String convertToJsonString(@Nullable final List<String> messages) {\r
287         // If messages are null or empty just return empty array\r
288         if (messages == null || messages.isEmpty()) {\r
289             return "[]";\r
290         }\r
291 \r
292 \r
293         List<JsonNode> jsonMessageObjectsList = new LinkedList<>();\r
294 \r
295         try {\r
296             for (String message : messages) {\r
297                 final JsonNode jsonNode = objectMapper.readTree(message);\r
298                 jsonMessageObjectsList.add(jsonNode);\r
299             }\r
300             return objectMapper.writeValueAsString(jsonMessageObjectsList);\r
301         } catch (JsonProcessingException e) {\r
302             final String errorMessage =\r
303                     format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",\r
304                             messages, e);\r
305             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
306 \r
307         } catch (IOException e) {\r
308             final String errorMessage =\r
309                     format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",\r
310                             messages, e);\r
311             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
312         }\r
313     }\r
314 \r
315 \r
316     /**\r
317      * Converts subscriber messages json string to List of messages. If message Json String is empty\r
318      * or null\r
319      *\r
320      * @param messagesJsonString json messages String\r
321      *\r
322      * @return List containing DMaaP MR Messages\r
323      */\r
324     protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {\r
325 \r
326         final LinkedList<String> messages = new LinkedList<>();\r
327 \r
328         // If message string is not null or not empty parse json message array to List of string messages\r
329         if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()\r
330                 && !("[]").equals(messagesJsonString.trim())) {\r
331 \r
332             try {\r
333                 // get root node\r
334                 final JsonNode rootNode = objectMapper.readTree(messagesJsonString);\r
335                 // iterate over root node and parse arrays messages\r
336                 for (JsonNode jsonNode : rootNode) {\r
337                     // if array parse it is array of messages\r
338                     final String incomingMessageString = jsonNode.toString();\r
339                     if (jsonNode.isArray()) {\r
340                         final List messageList = objectMapper.readValue(incomingMessageString, List.class);\r
341                         for (Object message : messageList) {\r
342                             final String jsonMessageString = objectMapper.writeValueAsString(message);\r
343                             addUnescapedJsonToMessage(messages, jsonMessageString);\r
344                         }\r
345                     } else {\r
346                         // parse it as object\r
347                         addUnescapedJsonToMessage(messages, incomingMessageString);\r
348                     }\r
349                 }\r
350 \r
351             } catch (IOException e) {\r
352                 final String errorMessage =\r
353                         format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +\r
354                                 " Json Error: %s", messagesJsonString, e);\r
355                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
356             }\r
357 \r
358         }\r
359         return messages;\r
360     }\r
361 \r
362     /**\r
363      * Adds unescaped Json messages to given messages list\r
364      *\r
365      * @param messages message list in which unescaped messages will be added\r
366      * @param incomingMessageString incoming message string that may need to be escaped\r
367      */\r
368     private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {\r
369         if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {\r
370             messages.add(StringEscapeUtils.unescapeJson(\r
371                     incomingMessageString.substring(1, incomingMessageString.length() - 1)));\r
372         } else {\r
373             messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));\r
374         }\r
375     }\r
376 \r
377 \r
378 }\r