Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / BaseDMaaPMRComponent.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.dmaap.service;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26 import com.google.common.base.Optional;
27 import org.apache.commons.codec.binary.Base64;
28 import org.apache.commons.lang3.StringEscapeUtils;
29 import org.apache.commons.lang3.tuple.ImmutablePair;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.apache.http.HttpEntity;
32 import org.apache.http.HttpResponse;
33 import org.apache.http.client.ResponseHandler;
34 import org.apache.http.client.utils.URIBuilder;
35 import org.apache.http.util.EntityUtils;
36 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
37 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
38 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
39 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
41 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
42 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
43 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
44 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
45 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import java.io.IOException;
50 import java.net.URI;
51 import java.net.URISyntaxException;
52 import java.nio.charset.Charset;
53 import java.util.LinkedList;
54 import java.util.List;
55
56 import javax.annotation.Nonnull;
57 import javax.annotation.Nullable;
58
59 import static java.lang.String.format;
60
61 /**
62  * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
63  *
64  * @author Rajiv Singla . Creation Date: 11/1/2016.
65  */
66 public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
67
68     private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
69
70     private static final ObjectMapper objectMapper = new ObjectMapper();
71
72     /**
73      * Creates Base64 encoded Auth Header for given userName and Password
74      * If either user name of password are null return absent
75      *
76      * @param userName username
77      * @param userPassword user password
78      * @return base64 encoded auth header if username or password are both non null
79      */
80     protected static Optional<String> getAuthHeader(@Nullable final String userName,
81                                                     @Nullable final String userPassword) {
82         if (userName == null || userPassword == null) {
83             return Optional.absent();
84         } else {
85             final String auth = userName + ":" + userPassword;
86             final Charset isoCharset = Charset.forName("ISO-8859-1");
87             byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
88             return Optional.of("Basic " + new String(encodedAuth, isoCharset));
89         }
90     }
91
92
93     /**
94      * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
95      *
96      * @param publisherConfig publisher settings
97      *
98      * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
99      */
100     protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
101         final String hostName = publisherConfig.getHostName();
102         final Integer portNumber = publisherConfig.getPortNumber();
103         final String getProtocol = publisherConfig.getProtocol();
104         final String topicName = publisherConfig.getTopicName();
105         URI publisherURI = null;
106         try {
107             publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
108                     .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
109         } catch (URISyntaxException e) {
110             final String errorMessage = format("Error while creating publisher URI: %s", e);
111             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
112         }
113         LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
114         return publisherURI;
115     }
116
117
118     /**
119      * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
120      *
121      * @param subscriberConfig subscriber settings
122      *
123      * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
124      */
125     protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
126         final String hostName = subscriberConfig.getHostName();
127         final Integer portNumber = subscriberConfig.getPortNumber();
128         final String getProtocol = subscriberConfig.getProtocol();
129         final String topicName = subscriberConfig.getTopicName();
130         final String consumerId = subscriberConfig.getConsumerId();
131         final String consumerGroup = subscriberConfig.getConsumerGroup();
132         final Integer timeoutMS = subscriberConfig.getTimeoutMS();
133         final Integer messageLimit = subscriberConfig.getMessageLimit();
134         URI subscriberURI = null;
135         try {
136             URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
137                     .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
138                             + topicName + "/"
139                             + consumerGroup + "/" +
140                             consumerId);
141             // add query params if present
142             if (timeoutMS > 0) {
143                 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
144             }
145             if (messageLimit > 0) {
146                 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
147                         messageLimit.toString());
148             }
149             subscriberURI = uriBuilder.build();
150
151         } catch (URISyntaxException e) {
152             final String errorMessage = format("Error while creating subscriber URI: %s", e);
153             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
154         }
155
156         LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
157         return subscriberURI;
158     }
159
160
161     /**
162      *  Creates 202 (Accepted) Response code message
163      *
164      * @param batchQueueSize batch Queue size
165      *
166      * @return response with 202 message code
167      */
168     protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
169         return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
170                 "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
171     }
172
173
174     /**
175      *  Creates 204 (No Content) Response code message
176      *
177      * @return response with 204 message code
178      */
179     protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
180         return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
181                 "No Content - No Messages in batch queue for flushing to MR Topic", 0);
182     }
183
184
185     /**
186      * Creates Publisher Response for given response code, response Message and pending Message Count
187      *
188      * @param responseCode HTTP Status Code
189      * @param responseMessage response message
190      * @param pendingMessages pending messages in batch queue
191      *
192      * @return DMaaP MR Publisher Response
193      */
194     protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
195             responseMessage, int pendingMessages) {
196         return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
197     }
198
199
200     /**
201      * Returns weekly consistent pending messages in batch queue
202      *
203      * @param publisherQueue batch queue
204      * @param publisherConfig publisher settings
205      *
206      * @return pending messages to be published
207      */
208     protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
209                                             @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
210         return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
211     }
212
213
214     /**
215      * Creates Subscriber Response for give response Code, response Message and fetch messages
216      *
217      * @param responseCode response Code
218      * @param responseMessage response Message
219      * @param fetchedMessages fetched messages
220      *
221      * @return DMaaP MR Subscriber Response
222      */
223     protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
224             responseMessage, List<String> fetchedMessages) {
225         if (fetchedMessages == null) {
226             return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
227         } else {
228             return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
229         }
230     }
231
232
233     /**
234      * Custom response handler which extract status code and response body
235      *
236      * @return Pair containing Response code and response body
237      */
238     protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
239         return new ResponseHandler<Pair<Integer, String>>() {
240             @Override
241             public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
242                 // Get Response status code
243                 final int status = response.getStatusLine().getStatusCode();
244                 final HttpEntity responseEntity = response.getEntity();
245                 // If response entity is not null - extract response body as string
246                 String responseEntityString = "";
247                 if (responseEntity != null) {
248                     responseEntityString = EntityUtils.toString(responseEntity);
249                 }
250                 return new ImmutablePair<>(status, responseEntityString);
251             }
252         };
253     }
254
255
256     /**
257      *  Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
258      *  be lost
259      *
260      * @param publisherQueue publisher queue
261      * @param messages recoverable messages to be published to recovery queue
262      */
263     protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
264                                                      List<String> messages) {
265         try {
266             publisherQueue.addRecoverableMessages(messages);
267
268             LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
269                     messages.size(), publisherQueue.getBatchQueueRemainingSize());
270
271         } catch (IllegalStateException e) {
272             final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
273                             "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
274                     messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
275             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
276         }
277     }
278
279
280     /**
281      * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
282      *
283      * @param messages messages that need to parsed to Json Array representation
284      * @return json string representation of message
285      */
286     protected static String convertToJsonString(@Nullable final List<String> messages) {
287         // If messages are null or empty just return empty array
288         if (messages == null || messages.isEmpty()) {
289             return "[]";
290         }
291
292
293         List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
294
295         try {
296             for (String message : messages) {
297                 final JsonNode jsonNode = objectMapper.readTree(message);
298                 jsonMessageObjectsList.add(jsonNode);
299             }
300             return objectMapper.writeValueAsString(jsonMessageObjectsList);
301         } catch (JsonProcessingException e) {
302             final String errorMessage =
303                     format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
304                             messages, e);
305             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
306
307         } catch (IOException e) {
308             final String errorMessage =
309                     format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
310                             messages, e);
311             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
312         }
313     }
314
315
316     /**
317      * Converts subscriber messages json string to List of messages. If message Json String is empty
318      * or null
319      *
320      * @param messagesJsonString json messages String
321      *
322      * @return List containing DMaaP MR Messages
323      */
324     protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
325
326         final LinkedList<String> messages = new LinkedList<>();
327
328         // If message string is not null or not empty parse json message array to List of string messages
329         if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
330                 && !"[]".equals(messagesJsonString.trim())) {
331
332             try {
333                 final List messageList = objectMapper.readValue(messagesJsonString, List.class);
334                 for (Object message : messageList) {
335                     final String jsonMessageString = objectMapper.writeValueAsString(message);
336                     if (jsonMessageString.startsWith("\"") && jsonMessageString.endsWith("\"")) {
337                         final String jsonSubString = jsonMessageString.substring(1, jsonMessageString.length() - 1);
338                         messages.add(StringEscapeUtils.unescapeJson(jsonSubString));
339                     } else {
340                         messages.add(StringEscapeUtils.unescapeJson(jsonMessageString));
341                     }
342                 }
343
344             } catch (IOException e) {
345                 final String errorMessage =
346                         format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
347                                 " Json Error: %s", messagesJsonString, e);
348                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
349             }
350
351         }
352         return messages;
353     }
354
355
356 }