1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
31 import com.att.nsa.util.rrConvertor;
32 import org.apache.http.HttpStatus;
33 import org.apache.kafka.clients.producer.ProducerRecord;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
36 import org.onap.dmaap.dmf.mr.CambriaApiException;
37 import org.onap.dmaap.dmf.mr.backends.Consumer;
38 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
39 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
40 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
41 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
43 import org.onap.dmaap.dmf.mr.beans.LogDetails;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
47 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
49 import org.onap.dmaap.dmf.mr.metabroker.Topic;
50 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
51 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
52 import org.onap.dmaap.dmf.mr.service.MMService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import org.springframework.beans.factory.annotation.Autowired;
57 import org.springframework.beans.factory.annotation.Qualifier;
58 import org.springframework.stereotype.Service;
60 import javax.servlet.http.HttpServletRequest;
61 import javax.servlet.http.HttpServletResponse;
62 import javax.ws.rs.core.Context;
63 import java.io.ByteArrayOutputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.text.SimpleDateFormat;
67 import java.util.ArrayList;
68 import java.util.Date;
69 import java.util.LinkedList;
73 public class MMServiceImpl implements MMService {
74 private static final String BATCH_LENGTH = "event.batch.length";
75 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
76 //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
77 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
79 private DMaaPErrorMessages errorMessages;
82 @Qualifier("configurationReader")
83 private ConfigurationReader configReader;
85 // HttpServletRequest object
87 private HttpServletRequest request;
89 // HttpServletResponse object
91 private HttpServletResponse response;
94 public void addWhiteList() {
99 public void removeWhiteList() {
104 public void listWhiteList() {
109 public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
110 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
111 CambriaApiException, IOException {
114 final HttpServletRequest req = ctx.getRequest();
115 ByteArrayOutputStream baos = new ByteArrayOutputStream();
117 // was this host blacklisted?
118 final String remoteAddr = Utils.getRemoteAddress(ctx);
120 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
122 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
123 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
124 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
125 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
126 ctx.getRequest().getRemoteHost(), null, null);
127 LOG.info(errRes.toString());
128 throw new CambriaApiException(errRes);
131 int limit = CambriaConstants.kNoLimit;
133 if (req.getParameter("limit") != null) {
134 limit = Integer.parseInt(req.getParameter("limit"));
138 int timeoutMs = CambriaConstants.kNoTimeout;
139 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
140 if (strtimeoutMS != null)
141 timeoutMs = Integer.parseInt(strtimeoutMS);
142 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
144 if (req.getParameter("timeout") != null) {
145 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
148 // By default no filter is applied if filter is not passed as a
149 // parameter in the request URI
150 String topicFilter = CambriaConstants.kNoFilter;
151 if (null != req.getParameter("filter")) {
152 topicFilter = req.getParameter("filter");
154 // pretty to print the messaages in new line
155 String prettyval = "0";
156 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
157 if (null != strPretty)
158 prettyval = strPretty;
160 String metaval = "0";
161 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
165 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
166 // withMeta to print offset along with message
167 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
169 // is this user allowed to read this topic?
170 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
171 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
173 if (metatopic == null) {
175 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
176 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
177 errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
178 topic, null, null, clientId, ctx.getRequest().getRemoteHost());
179 LOG.info(errRes.toString());
180 throw new CambriaApiException(errRes);
182 //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
184 * if (null==metricTopicname)
185 * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
186 * if(null==ctx.getRequest().getHeader("Authorization")&&
187 * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
188 * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
189 * permissions metatopic.checkUserRead(user); } }
194 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
196 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost());
198 final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
199 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
200 coes.setDmaapContext(ctx);
201 coes.setTopic(metatopic);
203 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
207 } catch (Exception ex) {
212 final int sent = coes.getSentCount();
214 metricsSet.consumeTick(sent);
216 } catch (UnavailableException excp) {
218 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
219 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
220 errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
221 null, null, clientId, ctx.getRequest().getRemoteHost());
222 LOG.info(errRes.toString());
223 throw new CambriaApiException(errRes);
225 } catch (CambriaApiException excp) {
228 } catch (Exception excp) {
230 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
232 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
233 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
234 "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
235 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
236 LOG.info(errRes.toString());
237 throw new CambriaApiException(errRes);
240 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
241 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
242 ConsumerFactory.kSetting_EnableCache);
243 if (null != strkSetting_EnableCache)
244 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
246 if (!kSetting_EnableCache && (c != null)) {
251 return baos.toString();
255 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
256 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
257 CambriaApiException, IOException, missingReqdSetting {
259 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
260 //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
262 final String remoteAddr = Utils.getRemoteAddress(ctx);
264 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
266 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
267 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
268 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
269 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
270 ctx.getRequest().getRemoteHost(), null, null);
271 LOG.info(errRes.toString());
272 throw new CambriaApiException(errRes);
275 String topicNameStd = null;
277 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
278 "enforced.topic.name.AAF");
279 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
280 "metrics.send.cambria.topic");
281 if (null == metricTopicname)
282 metricTopicname = "msgrtr.apinode.metrics.dmaap";
283 boolean topicNameEnforced = false;
284 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
285 topicNameEnforced = true;
288 final HttpServletRequest req = ctx.getRequest();
290 boolean chunked = false;
291 if (null != req.getHeader(TRANSFER_ENCODING)) {
292 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
295 String mediaType = req.getContentType();
296 if (mediaType == null || mediaType.length() == 0) {
297 mediaType = MimeTypes.kAppGenericBinary;
300 if (mediaType.contains("charset=UTF-8")) {
301 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
304 if (!topic.equalsIgnoreCase(metricTopicname)) {
305 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
307 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
311 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
312 final String messageCreationTime, final int messageSequence, final Long batchId,
313 final boolean transactionEnabled) {
314 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
316 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
317 msg.setTransactionEnabled(transactionEnabled);
318 msg.setLogDetails(logDetails);
321 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
322 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
323 LogDetails logDetails = new LogDetails();
324 logDetails.setTopicId(topicName);
325 logDetails.setMessageTimestamp(messageTimestamp);
326 logDetails.setPublisherId(Utils.getUserApiKey(request));
327 logDetails.setPublisherIp(request.getRemoteHost());
328 logDetails.setMessageBatchId(batchId);
329 logDetails.setMessageSequence(String.valueOf(messageSequence));
330 logDetails.setTransactionEnabled(transactionEnabled);
331 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
332 logDetails.setServerIp(request.getLocalAddr());
336 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
337 String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
338 CambriaApiException, IOException {
339 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
341 // setup the event set
342 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
344 // start processing, building a batch to push to the backend
345 final long startMs = System.currentTimeMillis();
348 long maxEventBatch = 1024L * 16;
349 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
350 if (null != batchlen)
351 maxEventBatch = Long.parseLong(batchlen);
353 // long maxEventBatch =
354 // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
355 final LinkedList<message> batch = new LinkedList<message>();
356 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
357 //final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
360 // for each message...
362 while ((m = events.next()) != null) {
363 // add the message to the batch
365 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
367 // check if the batch is full
368 final int sizeNow = batch.size();
369 if (sizeNow > maxEventBatch) {
370 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
373 metricsSet.publishTick(sizeNow);
378 // send the pending batch
379 final int sizeNow = batch.size();
381 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
384 metricsSet.publishTick(sizeNow);
388 final long endMs = System.currentTimeMillis();
389 final long totalMs = endMs - startMs;
391 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
394 final JSONObject response = new JSONObject();
395 response.put("count", count);
396 response.put("serverTimeMs", totalMs);
397 // DMaaPResponseBuilder.respondOk(ctx, response);
399 } catch (Exception excp) {
401 int status = HttpStatus.SC_NOT_FOUND;
402 String errorMsg = null;
403 if (excp.getClass().toString().contains("CambriaApiException")) {
404 status = ((CambriaApiException) excp).getStatus();
405 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
406 JSONObject errObject = new JSONObject(jsonTokener);
407 errorMsg = (String) errObject.get("message");
410 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
411 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
413 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
415 LOG.info(errRes.toString());
416 throw new CambriaApiException(errRes);
421 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
422 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
423 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
424 CambriaApiException {
426 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
428 // setup the event set
429 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
431 // start processing, building a batch to push to the backend
432 final long startMs = System.currentTimeMillis();
434 long maxEventBatch = 1024L * 16L;
435 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
437 maxEventBatch = Long.parseLong(evenlen);
439 final LinkedList<message> batch = new LinkedList<message>();
440 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
443 int messageSequence = 1;
445 final boolean transactionEnabled = true;
446 int publishBatchCount = 0;
447 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
449 // LOG.warn("Batch Start Id: " +
450 // Utils.getFromattedBatchSequenceId(batchId));
452 // for each message...
453 batchId = DMaaPContext.getBatchID();
455 String responseTransactionId = null;
457 while ((m = events.next()) != null) {
459 // LOG.warn("Batch Start Id: " +
460 // Utils.getFromattedBatchSequenceId(batchId));
462 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
466 // add the message to the batch
469 responseTransactionId = m.getLogDetails().getTransactionId();
471 JSONObject jsonObject = new JSONObject();
472 jsonObject.put("message", m.getMessage());
473 jsonObject.put("transactionId", responseTransactionId);
474 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
478 // check if the batch is full
479 final int sizeNow = batch.size();
480 if (sizeNow >= maxEventBatch) {
481 String startTime = sdf.format(new Date());
482 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
485 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
486 // transactionLogs(batch);
487 for (message msg : batch) {
488 LogDetails logDetails = msg.getLogDetails();
489 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
491 } catch (Exception excp) {
493 int status = HttpStatus.SC_NOT_FOUND;
494 String errorMsg = null;
495 if (excp.getClass().toString().contains("CambriaApiException")) {
496 status = ((CambriaApiException) excp).getStatus();
497 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
498 JSONObject errObject = new JSONObject(jsonTokener);
499 errorMsg = (String) errObject.get("message");
501 ErrorResponse errRes = new ErrorResponse(status,
502 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
503 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
504 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
505 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
506 ctx.getRequest().getRemoteHost(), null, null);
507 LOG.info(errRes.toString());
508 throw new CambriaApiException(errRes);
512 metricsSet.publishTick(sizeNow);
513 publishBatchCount = sizeNow;
516 String endTime = sdf.format(new Date());
517 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
518 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
519 + ",Batch End Time=" + endTime + "]");
520 batchId = DMaaPContext.getBatchID();
524 // send the pending batch
525 final int sizeNow = batch.size();
527 String startTime = sdf.format(new Date());
528 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
531 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
532 // transactionLogs(batch);
533 for (message msg : batch) {
534 LogDetails logDetails = msg.getLogDetails();
535 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
537 } catch (Exception excp) {
538 int status = HttpStatus.SC_NOT_FOUND;
539 String errorMsg = null;
540 if (excp.getClass().toString().contains("CambriaApiException")) {
541 status = ((CambriaApiException) excp).getStatus();
542 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
543 JSONObject errObject = new JSONObject(jsonTokener);
544 errorMsg = (String) errObject.get("message");
547 ErrorResponse errRes = new ErrorResponse(status,
548 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
549 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
550 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
551 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
552 ctx.getRequest().getRemoteHost(), null, null);
553 LOG.info(errRes.toString());
554 throw new CambriaApiException(errRes);
557 metricsSet.publishTick(sizeNow);
560 String endTime = sdf.format(new Date());
561 publishBatchCount = sizeNow;
562 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
563 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
567 final long endMs = System.currentTimeMillis();
568 final long totalMs = endMs - startMs;
570 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
573 final JSONObject response = new JSONObject();
574 response.put("count", count);
575 response.put("serverTimeMs", totalMs);
577 } catch (Exception excp) {
578 int status = HttpStatus.SC_NOT_FOUND;
579 String errorMsg = null;
580 if (excp.getClass().toString().contains("CambriaApiException")) {
581 status = ((CambriaApiException) excp).getStatus();
582 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
583 JSONObject errObject = new JSONObject(jsonTokener);
584 errorMsg = (String) errObject.get("message");
587 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
588 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
589 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
590 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
591 ctx.getRequest().getRemoteHost(), null, null);
592 LOG.info(errRes.toString());
593 throw new CambriaApiException(errRes);