2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.dcae.apod.analytics.dmaap.service;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26 import com.google.common.base.Optional;
27 import org.apache.commons.codec.binary.Base64;
28 import org.apache.commons.lang3.StringEscapeUtils;
29 import org.apache.commons.lang3.tuple.ImmutablePair;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.apache.http.HttpEntity;
32 import org.apache.http.HttpResponse;
33 import org.apache.http.client.ResponseHandler;
34 import org.apache.http.client.utils.URIBuilder;
35 import org.apache.http.util.EntityUtils;
36 import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
37 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
38 import org.onap.dcae.apod.analytics.common.utils.HTTPUtils;
39 import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
40 import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
41 import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
42 import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
43 import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
44 import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
45 import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import java.io.IOException;
51 import java.net.URISyntaxException;
52 import java.nio.charset.Charset;
53 import java.util.LinkedList;
54 import java.util.List;
56 import javax.annotation.Nonnull;
57 import javax.annotation.Nullable;
59 import static java.lang.String.format;
62 * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
64 * @author Rajiv Singla . Creation Date: 11/1/2016.
66 public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
68 private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
70 private static final ObjectMapper objectMapper = new ObjectMapper();
73 * Creates Base64 encoded Auth Header for given userName and Password
74 * If either user name of password are null return absent
76 * @param userName username
77 * @param userPassword user password
78 * @return base64 encoded auth header if username or password are both non null
80 protected static Optional<String> getAuthHeader(@Nullable final String userName,
81 @Nullable final String userPassword) {
82 if (userName == null || userPassword == null) {
83 return Optional.absent();
85 final String auth = userName + ":" + userPassword;
86 final Charset isoCharset = Charset.forName("ISO-8859-1");
87 byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
88 return Optional.of("Basic " + new String(encodedAuth, isoCharset));
94 * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
96 * @param publisherConfig publisher settings
98 * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
100 protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
101 final String hostName = publisherConfig.getHostName();
102 final Integer portNumber = publisherConfig.getPortNumber();
103 final String getProtocol = publisherConfig.getProtocol();
104 final String topicName = publisherConfig.getTopicName();
105 URI publisherURI = null;
107 publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
108 .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
109 } catch (URISyntaxException e) {
110 final String errorMessage = format("Error while creating publisher URI: %s", e);
111 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
113 LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
119 * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
121 * @param subscriberConfig subscriber settings
123 * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
125 protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
126 final String hostName = subscriberConfig.getHostName();
127 final Integer portNumber = subscriberConfig.getPortNumber();
128 final String getProtocol = subscriberConfig.getProtocol();
129 final String topicName = subscriberConfig.getTopicName();
130 final String consumerId = subscriberConfig.getConsumerId();
131 final String consumerGroup = subscriberConfig.getConsumerGroup();
132 final Integer timeoutMS = subscriberConfig.getTimeoutMS();
133 final Integer messageLimit = subscriberConfig.getMessageLimit();
134 URI subscriberURI = null;
136 URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
137 .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
139 + consumerGroup + "/" +
141 // add query params if present
143 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
145 if (messageLimit > 0) {
146 uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
147 messageLimit.toString());
149 subscriberURI = uriBuilder.build();
151 } catch (URISyntaxException e) {
152 final String errorMessage = format("Error while creating subscriber URI: %s", e);
153 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
156 LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
157 return subscriberURI;
162 * Creates 202 (Accepted) Response code message
164 * @param batchQueueSize batch Queue size
166 * @return response with 202 message code
168 protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
169 return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
170 "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
175 * Creates 204 (No Content) Response code message
177 * @return response with 204 message code
179 protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
180 return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
181 "No Content - No Messages in batch queue for flushing to MR Topic", 0);
186 * Creates Publisher Response for given response code, response Message and pending Message Count
188 * @param responseCode HTTP Status Code
189 * @param responseMessage response message
190 * @param pendingMessages pending messages in batch queue
192 * @return DMaaP MR Publisher Response
194 protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
195 responseMessage, int pendingMessages) {
196 return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
201 * Returns weekly consistent pending messages in batch queue
203 * @param publisherQueue batch queue
204 * @param publisherConfig publisher settings
206 * @return pending messages to be published
208 protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
209 @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
210 return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
215 * Creates Subscriber Response for give response Code, response Message and fetch messages
217 * @param responseCode response Code
218 * @param responseMessage response Message
219 * @param fetchedMessages fetched messages
221 * @return DMaaP MR Subscriber Response
223 protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
224 responseMessage, List<String> fetchedMessages) {
225 if (fetchedMessages == null) {
226 return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
228 return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
234 * Custom response handler which extract status code and response body
236 * @return Pair containing Response code and response body
238 protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
239 return new ResponseHandler<Pair<Integer, String>>() {
241 public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
242 // Get Response status code
243 final int status = response.getStatusLine().getStatusCode();
244 final HttpEntity responseEntity = response.getEntity();
245 // If response entity is not null - extract response body as string
246 String responseEntityString = "";
247 if (responseEntity != null) {
248 responseEntityString = EntityUtils.toString(responseEntity);
250 return new ImmutablePair<>(status, responseEntityString);
257 * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
260 * @param publisherQueue publisher queue
261 * @param messages recoverable messages to be published to recovery queue
263 protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
264 List<String> messages) {
266 publisherQueue.addRecoverableMessages(messages);
268 LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
269 messages.size(), publisherQueue.getBatchQueueRemainingSize());
271 } catch (IllegalStateException e) {
272 final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
273 "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
274 messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
275 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
281 * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
283 * @param messages messages that need to parsed to Json Array representation
284 * @return json string representation of message
286 protected static String convertToJsonString(@Nullable final List<String> messages) {
287 // If messages are null or empty just return empty array
288 if (messages == null || messages.isEmpty()) {
293 List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
296 for (String message : messages) {
297 final JsonNode jsonNode = objectMapper.readTree(message);
298 jsonMessageObjectsList.add(jsonNode);
300 return objectMapper.writeValueAsString(jsonMessageObjectsList);
301 } catch (JsonProcessingException e) {
302 final String errorMessage =
303 format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
305 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
307 } catch (IOException e) {
308 final String errorMessage =
309 format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
311 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
317 * Converts subscriber messages json string to List of messages. If message Json String is empty
320 * @param messagesJsonString json messages String
322 * @return List containing DMaaP MR Messages
324 protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
326 final LinkedList<String> messages = new LinkedList<>();
328 // If message string is not null or not empty parse json message array to List of string messages
329 if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
330 && !("[]").equals(messagesJsonString.trim())) {
334 final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
335 // iterate over root node and parse arrays messages
336 for (JsonNode jsonNode : rootNode) {
337 // if array parse it is array of messages
338 final String incomingMessageString = jsonNode.toString();
339 if (jsonNode.isArray()) {
340 final List messageList = objectMapper.readValue(incomingMessageString, List.class);
341 for (Object message : messageList) {
342 final String jsonMessageString = objectMapper.writeValueAsString(message);
343 addUnescapedJsonToMessage(messages, jsonMessageString);
346 // parse it as object
347 addUnescapedJsonToMessage(messages, incomingMessageString);
351 } catch (IOException e) {
352 final String errorMessage =
353 format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
354 " Json Error: %s", messagesJsonString, e);
355 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
363 * Adds unescaped Json messages to given messages list
365 * @param messages message list in which unescaped messages will be added
366 * @param incomingMessageString incoming message string that may need to be escaped
368 private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
369 if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
370 messages.add(StringEscapeUtils.unescapeJson(
371 incomingMessageString.substring(1, incomingMessageString.length() - 1)));
373 messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));