2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.dmaap.service;
\r
23 import com.fasterxml.jackson.core.JsonProcessingException;
\r
24 import com.fasterxml.jackson.databind.JsonNode;
\r
25 import com.fasterxml.jackson.databind.ObjectMapper;
\r
26 import com.google.common.base.Optional;
\r
27 import org.apache.commons.codec.binary.Base64;
\r
28 import org.apache.commons.lang3.StringEscapeUtils;
\r
29 import org.apache.commons.lang3.tuple.ImmutablePair;
\r
30 import org.apache.commons.lang3.tuple.Pair;
\r
31 import org.apache.http.HttpEntity;
\r
32 import org.apache.http.HttpResponse;
\r
33 import org.apache.http.client.ResponseHandler;
\r
34 import org.apache.http.client.utils.URIBuilder;
\r
35 import org.apache.http.util.EntityUtils;
\r
36 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
\r
37 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
38 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
\r
39 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
\r
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
\r
41 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
\r
42 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
\r
43 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
\r
44 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
\r
45 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
\r
46 import org.slf4j.Logger;
\r
47 import org.slf4j.LoggerFactory;
\r
49 import java.io.IOException;
\r
50 import java.net.URI;
\r
51 import java.net.URISyntaxException;
\r
52 import java.nio.charset.Charset;
\r
53 import java.util.LinkedList;
\r
54 import java.util.List;
\r
56 import javax.annotation.Nonnull;
\r
57 import javax.annotation.Nullable;
\r
59 import static java.lang.String.format;
\r
62 * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
\r
64 * @author Rajiv Singla . Creation Date: 11/1/2016.
\r
66 public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
\r
68 private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
\r
70 private static final ObjectMapper objectMapper = new ObjectMapper();
\r
73 * Creates Base64 encoded Auth Header for given userName and Password
\r
74 * If either user name of password are null return absent
\r
76 * @param userName username
\r
77 * @param userPassword user password
\r
78 * @return base64 encoded auth header if username or password are both non null
\r
80 protected static Optional<String> getAuthHeader(@Nullable final String userName,
\r
81 @Nullable final String userPassword) {
\r
82 if (userName == null || userPassword == null) {
\r
83 return Optional.absent();
\r
85 final String auth = userName + ":" + userPassword;
\r
86 final Charset isoCharset = Charset.forName("ISO-8859-1");
\r
87 byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
\r
88 return Optional.of("Basic " + new String(encodedAuth, isoCharset));
\r
94 * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
\r
96 * @param publisherConfig publisher settings
\r
98 * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
\r
100 protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
\r
101 final String hostName = publisherConfig.getHostName();
\r
102 final Integer portNumber = publisherConfig.getPortNumber();
\r
103 final String getProtocol = publisherConfig.getProtocol();
\r
104 final String topicName = publisherConfig.getTopicName();
\r
105 URI publisherURI = null;
\r
107 publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
\r
108 .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
\r
109 } catch (URISyntaxException e) {
\r
110 final String errorMessage = format("Error while creating publisher URI: %s", e);
\r
111 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
113 LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
\r
114 return publisherURI;
\r
119 * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
\r
121 * @param subscriberConfig subscriber settings
\r
123 * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
\r
125 protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
\r
126 final String hostName = subscriberConfig.getHostName();
\r
127 final Integer portNumber = subscriberConfig.getPortNumber();
\r
128 final String getProtocol = subscriberConfig.getProtocol();
\r
129 final String topicName = subscriberConfig.getTopicName();
\r
130 final String consumerId = subscriberConfig.getConsumerId();
\r
131 final String consumerGroup = subscriberConfig.getConsumerGroup();
\r
132 final Integer timeoutMS = subscriberConfig.getTimeoutMS();
\r
133 final Integer messageLimit = subscriberConfig.getMessageLimit();
\r
134 URI subscriberURI = null;
\r
136 URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
\r
137 .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
\r
139 + consumerGroup + "/" +
\r
141 // add query params if present
\r
142 if (timeoutMS > 0) {
\r
143 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
\r
145 if (messageLimit > 0) {
\r
146 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
\r
147 messageLimit.toString());
\r
149 subscriberURI = uriBuilder.build();
\r
151 } catch (URISyntaxException e) {
\r
152 final String errorMessage = format("Error while creating subscriber URI: %s", e);
\r
153 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
156 LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
\r
157 return subscriberURI;
\r
162 * Creates 202 (Accepted) Response code message
\r
164 * @param batchQueueSize batch Queue size
\r
166 * @return response with 202 message code
\r
168 protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
\r
169 return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
\r
170 "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
\r
175 * Creates 204 (No Content) Response code message
\r
177 * @return response with 204 message code
\r
179 protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
\r
180 return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
\r
181 "No Content - No Messages in batch queue for flushing to MR Topic", 0);
\r
186 * Creates Publisher Response for given response code, response Message and pending Message Count
\r
188 * @param responseCode HTTP Status Code
\r
189 * @param responseMessage response message
\r
190 * @param pendingMessages pending messages in batch queue
\r
192 * @return DMaaP MR Publisher Response
\r
194 protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
\r
195 responseMessage, int pendingMessages) {
\r
196 return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
\r
201 * Returns weekly consistent pending messages in batch queue
\r
203 * @param publisherQueue batch queue
\r
204 * @param publisherConfig publisher settings
\r
206 * @return pending messages to be published
\r
208 protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
\r
209 @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
\r
210 return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
\r
215 * Creates Subscriber Response for give response Code, response Message and fetch messages
\r
217 * @param responseCode response Code
\r
218 * @param responseMessage response Message
\r
219 * @param fetchedMessages fetched messages
\r
221 * @return DMaaP MR Subscriber Response
\r
223 protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
\r
224 responseMessage, List<String> fetchedMessages) {
\r
225 if (fetchedMessages == null) {
\r
226 return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
\r
228 return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
\r
234 * Custom response handler which extract status code and response body
\r
236 * @return Pair containing Response code and response body
\r
238 protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
\r
239 return new ResponseHandler<Pair<Integer, String>>() {
\r
241 public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
\r
242 // Get Response status code
\r
243 final int status = response.getStatusLine().getStatusCode();
\r
244 final HttpEntity responseEntity = response.getEntity();
\r
245 // If response entity is not null - extract response body as string
\r
246 String responseEntityString = "";
\r
247 if (responseEntity != null) {
\r
248 responseEntityString = EntityUtils.toString(responseEntity);
\r
250 return new ImmutablePair<>(status, responseEntityString);
\r
257 * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
\r
260 * @param publisherQueue publisher queue
\r
261 * @param messages recoverable messages to be published to recovery queue
\r
263 protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
\r
264 List<String> messages) {
\r
266 publisherQueue.addRecoverableMessages(messages);
\r
268 LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
\r
269 messages.size(), publisherQueue.getBatchQueueRemainingSize());
\r
271 } catch (IllegalStateException e) {
\r
272 final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
\r
273 "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
\r
274 messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
\r
275 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
281 * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
\r
283 * @param messages messages that need to parsed to Json Array representation
\r
284 * @return json string representation of message
\r
286 protected static String convertToJsonString(@Nullable final List<String> messages) {
\r
287 // If messages are null or empty just return empty array
\r
288 if (messages == null || messages.isEmpty()) {
\r
293 List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
\r
296 for (String message : messages) {
\r
297 final JsonNode jsonNode = objectMapper.readTree(message);
\r
298 jsonMessageObjectsList.add(jsonNode);
\r
300 return objectMapper.writeValueAsString(jsonMessageObjectsList);
\r
301 } catch (JsonProcessingException e) {
\r
302 final String errorMessage =
\r
303 format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
\r
305 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
307 } catch (IOException e) {
\r
308 final String errorMessage =
\r
309 format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
\r
311 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
317 * Converts subscriber messages json string to List of messages. If message Json String is empty
\r
320 * @param messagesJsonString json messages String
\r
322 * @return List containing DMaaP MR Messages
\r
324 protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
\r
326 final LinkedList<String> messages = new LinkedList<>();
\r
328 // If message string is not null or not empty parse json message array to List of string messages
\r
329 if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
\r
330 && !("[]").equals(messagesJsonString.trim())) {
\r
334 final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
\r
335 // iterate over root node and parse arrays messages
\r
336 for (JsonNode jsonNode : rootNode) {
\r
337 // if array parse it is array of messages
\r
338 final String incomingMessageString = jsonNode.toString();
\r
339 if (jsonNode.isArray()) {
\r
340 final List messageList = objectMapper.readValue(incomingMessageString, List.class);
\r
341 for (Object message : messageList) {
\r
342 final String jsonMessageString = objectMapper.writeValueAsString(message);
\r
343 addUnescapedJsonToMessage(messages, jsonMessageString);
\r
346 // parse it as object
\r
347 addUnescapedJsonToMessage(messages, incomingMessageString);
\r
351 } catch (IOException e) {
\r
352 final String errorMessage =
\r
353 format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
\r
354 " Json Error: %s", messagesJsonString, e);
\r
355 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
363 * Adds unescaped Json messages to given messages list
\r
365 * @param messages message list in which unescaped messages will be added
\r
366 * @param incomingMessageString incoming message string that may need to be escaped
\r
368 private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
\r
369 if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
\r
370 messages.add(StringEscapeUtils.unescapeJson(
\r
371 incomingMessageString.substring(1, incomingMessageString.length() - 1)));
\r
373 messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));
\r