1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.text.SimpleDateFormat;
28 import java.util.ArrayList;
29 import java.util.Date;
30 import java.util.LinkedList;
32 import javax.servlet.http.HttpServletRequest;
33 import javax.servlet.http.HttpServletResponse;
34 import javax.ws.rs.core.Context;
36 import org.apache.http.HttpStatus;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.stereotype.Service;
44 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
45 import org.onap.dmaap.dmf.mr.CambriaApiException;
46 import org.onap.dmaap.dmf.mr.backends.Consumer;
47 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
48 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
49 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
50 import org.onap.dmaap.dmf.mr.backends.Publisher;
51 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
52 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
53 import org.onap.dmaap.dmf.mr.beans.LogDetails;
54 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
55 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
56 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
57 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
58 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
59 import org.onap.dmaap.dmf.mr.metabroker.Topic;
60 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
61 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
62 import org.onap.dmaap.dmf.mr.service.MMService;
63 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
64 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
65 import org.onap.dmaap.dmf.mr.utils.Utils;
66 import com.att.eelf.configuration.EELFLogger;
67 import com.att.eelf.configuration.EELFManager;
68 import com.att.nsa.configs.ConfigDbException;
69 import com.att.nsa.drumlin.service.standards.MimeTypes;
70 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
71 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
72 import com.att.nsa.util.rrConvertor;
77 public class MMServiceImpl implements MMService {
78 private static final String BATCH_LENGTH = "event.batch.length";
79 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
80 //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
81 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
83 private DMaaPErrorMessages errorMessages;
86 @Qualifier("configurationReader")
87 private ConfigurationReader configReader;
89 // HttpServletRequest object
91 private HttpServletRequest request;
93 // HttpServletResponse object
95 private HttpServletResponse response;
98 public void addWhiteList() {
103 public void removeWhiteList() {
108 public void listWhiteList() {
113 public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
114 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
115 CambriaApiException, IOException {
118 final HttpServletRequest req = ctx.getRequest();
119 ByteArrayOutputStream baos = new ByteArrayOutputStream();
121 // was this host blacklisted?
122 final String remoteAddr = Utils.getRemoteAddress(ctx);
124 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
126 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
127 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
128 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
129 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
130 ctx.getRequest().getRemoteHost(), null, null);
131 LOG.info(errRes.toString());
132 throw new CambriaApiException(errRes);
135 int limit = CambriaConstants.kNoLimit;
137 if (req.getParameter("limit") != null) {
138 limit = Integer.parseInt(req.getParameter("limit"));
142 int timeoutMs = CambriaConstants.kNoTimeout;
143 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
144 if (strtimeoutMS != null)
145 timeoutMs = Integer.parseInt(strtimeoutMS);
146 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
148 if (req.getParameter("timeout") != null) {
149 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
152 // By default no filter is applied if filter is not passed as a
153 // parameter in the request URI
154 String topicFilter = CambriaConstants.kNoFilter;
155 if (null != req.getParameter("filter")) {
156 topicFilter = req.getParameter("filter");
158 // pretty to print the messaages in new line
159 String prettyval = "0";
160 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
161 if (null != strPretty)
162 prettyval = strPretty;
164 String metaval = "0";
165 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
169 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
170 // withMeta to print offset along with message
171 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
173 // is this user allowed to read this topic?
174 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
175 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
177 if (metatopic == null) {
179 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
180 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
181 errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
182 topic, null, null, clientId, ctx.getRequest().getRemoteHost());
183 LOG.info(errRes.toString());
184 throw new CambriaApiException(errRes);
186 //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
188 * if (null==metricTopicname)
189 * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
190 * if(null==ctx.getRequest().getHeader("Authorization")&&
191 * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
192 * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
193 * permissions metatopic.checkUserRead(user); } }
198 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
200 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost());
202 final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
203 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
204 coes.setDmaapContext(ctx);
205 coes.setTopic(metatopic);
207 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
211 } catch (Exception ex) {
216 final int sent = coes.getSentCount();
218 metricsSet.consumeTick(sent);
220 } catch (UnavailableException excp) {
222 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
223 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
224 errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
225 null, null, clientId, ctx.getRequest().getRemoteHost());
226 LOG.info(errRes.toString());
227 throw new CambriaApiException(errRes);
229 } catch (CambriaApiException excp) {
232 } catch (Exception excp) {
234 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
236 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
237 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
238 "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
239 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
240 LOG.info(errRes.toString());
241 throw new CambriaApiException(errRes);
244 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
245 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246 ConsumerFactory.kSetting_EnableCache);
247 if (null != strkSetting_EnableCache)
248 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
250 if (!kSetting_EnableCache && (c != null)) {
255 return baos.toString();
259 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
260 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
261 CambriaApiException, IOException, missingReqdSetting {
263 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
264 //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
266 final String remoteAddr = Utils.getRemoteAddress(ctx);
268 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
270 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
271 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
272 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
273 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
274 ctx.getRequest().getRemoteHost(), null, null);
275 LOG.info(errRes.toString());
276 throw new CambriaApiException(errRes);
279 String topicNameStd = null;
281 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
282 "enforced.topic.name.AAF");
283 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
284 "metrics.send.cambria.topic");
285 if (null == metricTopicname)
286 metricTopicname = "msgrtr.apinode.metrics.dmaap";
287 boolean topicNameEnforced = false;
288 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
289 topicNameEnforced = true;
292 final HttpServletRequest req = ctx.getRequest();
294 boolean chunked = false;
295 if (null != req.getHeader(TRANSFER_ENCODING)) {
296 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
299 String mediaType = req.getContentType();
300 if (mediaType == null || mediaType.length() == 0) {
301 mediaType = MimeTypes.kAppGenericBinary;
304 if (mediaType.contains("charset=UTF-8")) {
305 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
308 if (!topic.equalsIgnoreCase(metricTopicname)) {
309 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
311 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
315 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
316 final String messageCreationTime, final int messageSequence, final Long batchId,
317 final boolean transactionEnabled) {
318 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
320 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
321 msg.setTransactionEnabled(transactionEnabled);
322 msg.setLogDetails(logDetails);
325 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
326 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
327 LogDetails logDetails = new LogDetails();
328 logDetails.setTopicId(topicName);
329 logDetails.setMessageTimestamp(messageTimestamp);
330 logDetails.setPublisherId(Utils.getUserApiKey(request));
331 logDetails.setPublisherIp(request.getRemoteHost());
332 logDetails.setMessageBatchId(batchId);
333 logDetails.setMessageSequence(String.valueOf(messageSequence));
334 logDetails.setTransactionEnabled(transactionEnabled);
335 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
336 logDetails.setServerIp(request.getLocalAddr());
340 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
341 String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
342 CambriaApiException, IOException {
343 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
345 // setup the event set
346 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
348 // start processing, building a batch to push to the backend
349 final long startMs = System.currentTimeMillis();
352 long maxEventBatch = 1024L * 16;
353 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
354 if (null != batchlen)
355 maxEventBatch = Long.parseLong(batchlen);
357 // long maxEventBatch =
358 // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
359 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
360 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
361 //final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
364 // for each message...
365 Publisher.message m = null;
366 while ((m = events.next()) != null) {
367 // add the message to the batch
369 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
371 // check if the batch is full
372 final int sizeNow = batch.size();
373 if (sizeNow > maxEventBatch) {
374 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
377 metricsSet.publishTick(sizeNow);
382 // send the pending batch
383 final int sizeNow = batch.size();
385 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
388 metricsSet.publishTick(sizeNow);
392 final long endMs = System.currentTimeMillis();
393 final long totalMs = endMs - startMs;
395 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
398 final JSONObject response = new JSONObject();
399 response.put("count", count);
400 response.put("serverTimeMs", totalMs);
401 // DMaaPResponseBuilder.respondOk(ctx, response);
403 } catch (Exception excp) {
405 int status = HttpStatus.SC_NOT_FOUND;
406 String errorMsg = null;
407 if (excp.getClass().toString().contains("CambriaApiException")) {
408 status = ((CambriaApiException) excp).getStatus();
409 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
410 JSONObject errObject = new JSONObject(jsonTokener);
411 errorMsg = (String) errObject.get("message");
414 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
415 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
417 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
419 LOG.info(errRes.toString());
420 throw new CambriaApiException(errRes);
425 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
426 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
427 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
428 CambriaApiException {
430 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
432 // setup the event set
433 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
435 // start processing, building a batch to push to the backend
436 final long startMs = System.currentTimeMillis();
438 long maxEventBatch = 1024 * 16;
439 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
441 maxEventBatch = Long.parseLong(evenlen);
443 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
444 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
446 Publisher.message m = null;
447 int messageSequence = 1;
449 final boolean transactionEnabled = true;
450 int publishBatchCount = 0;
451 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
453 // LOG.warn("Batch Start Id: " +
454 // Utils.getFromattedBatchSequenceId(batchId));
456 // for each message...
457 batchId = DMaaPContext.getBatchID();
459 String responseTransactionId = null;
461 while ((m = events.next()) != null) {
463 // LOG.warn("Batch Start Id: " +
464 // Utils.getFromattedBatchSequenceId(batchId));
466 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
470 // add the message to the batch
473 responseTransactionId = m.getLogDetails().getTransactionId();
475 JSONObject jsonObject = new JSONObject();
476 jsonObject.put("message", m.getMessage());
477 jsonObject.put("transactionId", responseTransactionId);
478 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
482 // check if the batch is full
483 final int sizeNow = batch.size();
484 if (sizeNow >= maxEventBatch) {
485 String startTime = sdf.format(new Date());
486 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
489 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
490 // transactionLogs(batch);
491 for (message msg : batch) {
492 LogDetails logDetails = msg.getLogDetails();
493 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
495 } catch (Exception excp) {
497 int status = HttpStatus.SC_NOT_FOUND;
498 String errorMsg = null;
499 if (excp.getClass().toString().contains("CambriaApiException")) {
500 status = ((CambriaApiException) excp).getStatus();
501 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
502 JSONObject errObject = new JSONObject(jsonTokener);
503 errorMsg = (String) errObject.get("message");
505 ErrorResponse errRes = new ErrorResponse(status,
506 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
507 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
508 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
509 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
510 ctx.getRequest().getRemoteHost(), null, null);
511 LOG.info(errRes.toString());
512 throw new CambriaApiException(errRes);
516 metricsSet.publishTick(sizeNow);
517 publishBatchCount = sizeNow;
520 String endTime = sdf.format(new Date());
521 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
522 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
523 + ",Batch End Time=" + endTime + "]");
524 batchId = DMaaPContext.getBatchID();
528 // send the pending batch
529 final int sizeNow = batch.size();
531 String startTime = sdf.format(new Date());
532 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
535 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
536 // transactionLogs(batch);
537 for (message msg : batch) {
538 LogDetails logDetails = msg.getLogDetails();
539 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
541 } catch (Exception excp) {
542 int status = HttpStatus.SC_NOT_FOUND;
543 String errorMsg = null;
544 if (excp.getClass().toString().contains("CambriaApiException")) {
545 status = ((CambriaApiException) excp).getStatus();
546 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
547 JSONObject errObject = new JSONObject(jsonTokener);
548 errorMsg = (String) errObject.get("message");
551 ErrorResponse errRes = new ErrorResponse(status,
552 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
553 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
554 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
555 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
556 ctx.getRequest().getRemoteHost(), null, null);
557 LOG.info(errRes.toString());
558 throw new CambriaApiException(errRes);
561 metricsSet.publishTick(sizeNow);
564 String endTime = sdf.format(new Date());
565 publishBatchCount = sizeNow;
566 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
567 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
571 final long endMs = System.currentTimeMillis();
572 final long totalMs = endMs - startMs;
574 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
577 final JSONObject response = new JSONObject();
578 response.put("count", count);
579 response.put("serverTimeMs", totalMs);
581 } catch (Exception excp) {
582 int status = HttpStatus.SC_NOT_FOUND;
583 String errorMsg = null;
584 if (excp.getClass().toString().contains("CambriaApiException")) {
585 status = ((CambriaApiException) excp).getStatus();
586 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
587 JSONObject errObject = new JSONObject(jsonTokener);
588 errorMsg = (String) errObject.get("message");
591 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
592 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
593 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
594 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
595 ctx.getRequest().getRemoteHost(), null, null);
596 LOG.info(errRes.toString());
597 throw new CambriaApiException(errRes);