X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fservice%2Fimpl%2FMMServiceImpl.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fservice%2Fimpl%2FMMServiceImpl.java;h=bfa48cf071361d806cd8f5e4d6b8b5bbb3495af2;hb=f052f758c29fa92c21578514202268f3b0430a1a;hp=0000000000000000000000000000000000000000;hpb=388410f85e44de41ef9b340f44c0b4b1b0af59b6;p=dmaap%2Fmessagerouter%2Fmessageservice.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java new file mode 100644 index 0000000..bfa48cf --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java @@ -0,0 +1,596 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.dmf.mr.service.impl; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; +import org.apache.http.HttpStatus; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.backends.Consumer; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; +import org.onap.dmaap.dmf.mr.backends.MetricsSet; +import org.onap.dmaap.dmf.mr.backends.Publisher.message; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.beans.LogDetails; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; +import org.onap.dmaap.dmf.mr.exception.ErrorResponse; +import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException; +import org.onap.dmaap.dmf.mr.metabroker.Topic; +import org.onap.dmaap.dmf.mr.resources.CambriaEventSet; +import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; +import org.onap.dmaap.dmf.mr.service.MMService; +import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; + + +@Service +public class MMServiceImpl implements MMService { + private static final String BATCH_LENGTH = "event.batch.length"; + private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class); + @Autowired + private DMaaPErrorMessages errorMessages; + + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + // HttpServletRequest object + @Context + private HttpServletRequest request; + + // HttpServletResponse object + @Context + private HttpServletResponse response; + + @Override + public void addWhiteList() { + + } + + @Override + public void removeWhiteList() { + + } + + @Override + public void listWhiteList() { + + } + + @Override + public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId) + throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + + + final HttpServletRequest req = ctx.getRequest(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // was this host blacklisted? + final String remoteAddr = Utils.getRemoteAddress(ctx); + + if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + int limit = CambriaConstants.kNoLimit; + + if (req.getParameter("limit") != null) { + limit = Integer.parseInt(req.getParameter("limit")); + } + limit = 1; + + int timeoutMs = CambriaConstants.kNoTimeout; + String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); + if (strtimeoutMS != null) + timeoutMs = Integer.parseInt(strtimeoutMS); + // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", + + if (req.getParameter("timeout") != null) { + timeoutMs = Integer.parseInt(req.getParameter("timeout")); + } + + // By default no filter is applied if filter is not passed as a + // parameter in the request URI + String topicFilter = CambriaConstants.kNoFilter; + if (null != req.getParameter("filter")) { + topicFilter = req.getParameter("filter"); + } + // pretty to print the messaages in new line + String prettyval = "0"; + String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty"); + if (null != strPretty) + prettyval = strPretty; + + String metaval = "0"; + String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta"); + if (null != strmeta) + metaval = strmeta; + + final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval); + // withMeta to print offset along with message + final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval); + + // is this user allowed to read this topic? + //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + if (metatopic == null) { + // no such topic. + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), + errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()), + topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic"); + /* + * if (null==metricTopicname) + * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null) + * if(null==ctx.getRequest().getHeader("Authorization")&& + * !topic.equalsIgnoreCase(metricTopicname)) { if (null != + * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check + * permissions metatopic.checkUserRead(user); } } + */ + + Consumer c = null; + try { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost()); + + final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs) + .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); + coes.setDmaapContext(ctx); + coes.setTopic(metatopic); + + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + + try { + coes.write(baos); + } catch (Exception ex) { + + } + + c.commitOffsets(); + final int sent = coes.getSentCount(); + + metricsSet.consumeTick(sent); + + } catch (UnavailableException excp) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (CambriaApiException excp) { + + throw excp; + } catch (Exception excp) { + + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null, + Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } finally { + + boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + ConsumerFactory.kSetting_EnableCache); + if (null != strkSetting_EnableCache) + kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); + + if (!kSetting_EnableCache && (c != null)) { + c.close(); + + } + } + return baos.toString(); + } + + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException, missingReqdSetting { + + //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + final String remoteAddr = Utils.getRemoteAddress(ctx); + + if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + String topicNameStd = null; + + topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, + "enforced.topic.name.AAF"); + String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "metrics.send.cambria.topic"); + if (null == metricTopicname) + metricTopicname = "msgrtr.apinode.metrics.dmaap"; + boolean topicNameEnforced = false; + if (null != topicNameStd && topic.startsWith(topicNameStd)) { + topicNameEnforced = true; + } + + final HttpServletRequest req = ctx.getRequest(); + + boolean chunked = false; + if (null != req.getHeader(TRANSFER_ENCODING)) { + chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + String mediaType = req.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + mediaType = MimeTypes.kAppGenericBinary; + } + + if (mediaType.contains("charset=UTF-8")) { + mediaType = mediaType.replace("; charset=UTF-8", "").trim(); + } + + if (!topic.equalsIgnoreCase(metricTopicname)) { + pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); + } else { + pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); + } + } + + private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request, + final String messageCreationTime, final int messageSequence, final Long batchId, + final boolean transactionEnabled) { + LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId, + transactionEnabled); + logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage())); + msg.setTransactionEnabled(transactionEnabled); + msg.setLogDetails(logDetails); + } + + private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request, + final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) { + LogDetails logDetails = new LogDetails(); + logDetails.setTopicId(topicName); + logDetails.setMessageTimestamp(messageTimestamp); + logDetails.setPublisherId(Utils.getUserApiKey(request)); + logDetails.setPublisherIp(request.getRemoteHost()); + logDetails.setMessageBatchId(batchId); + logDetails.setMessageSequence(String.valueOf(messageSequence)); + logDetails.setTransactionEnabled(transactionEnabled); + logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date())); + logDetails.setServerIp(request.getLocalAddr()); + return logDetails; + } + + private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, + String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + + long maxEventBatch = 1024L * 16; + String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); + if (null != batchlen) + maxEventBatch = Long.parseLong(batchlen); + + // long maxEventBatch = + // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); + final LinkedList batch = new LinkedList(); + final ArrayList> pms = new ArrayList>(); + //final ArrayList> kms = new ArrayList>(); + + try { + // for each message... + message m = null; + while ((m = events.next()) != null) { + // add the message to the batch + batch.add(m); + final ProducerRecord data = new ProducerRecord(topic, m.getKey(), + m.getMessage()); + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow > maxEventBatch) { + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + // build a responseP + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + // DMaaPResponseBuilder.respondOk(ctx, response); + + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp.getClass().toString().contains("CambriaApiException")) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + + } + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count + + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null, + null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, + final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) + throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, + CambriaApiException { + + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + long maxEventBatch = 1024L * 16L; + String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); + if (null != evenlen) + maxEventBatch = Long.parseLong(evenlen); + + final LinkedList batch = new LinkedList(); + final ArrayList> pms = new ArrayList>(); + + message m = null; + int messageSequence = 1; + Long batchId = 1L; + final boolean transactionEnabled = true; + int publishBatchCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS"); + + // LOG.warn("Batch Start Id: " + + // Utils.getFromattedBatchSequenceId(batchId)); + try { + // for each message... + batchId = DMaaPContext.getBatchID(); + + String responseTransactionId = null; + + while ((m = events.next()) != null) { + + // LOG.warn("Batch Start Id: " + + // Utils.getFromattedBatchSequenceId(batchId)); + + addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId, + transactionEnabled); + messageSequence++; + + // add the message to the batch + batch.add(m); + + responseTransactionId = m.getLogDetails().getTransactionId(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message", m.getMessage()); + jsonObject.put("transactionId", responseTransactionId); + final ProducerRecord data = new ProducerRecord(topic, m.getKey(), + m.getMessage()); + pms.add(data); + + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow >= maxEventBatch) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + // transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp.getClass().toString().contains("CambriaApiException")) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + publishBatchCount = sizeNow; + count += sizeNow; + // batchId++; + String endTime = sdf.format(new Date()); + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + + ",Batch End Time=" + endTime + "]"); + batchId = DMaaPContext.getBatchID(); + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + // transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp.getClass().toString().contains("CambriaApiException")) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + pms.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + // batchId++; + String endTime = sdf.format(new Date()); + publishBatchCount = sizeNow; + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId + + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time=" + + endTime + "]"); + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + // build a response + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp.getClass().toString().contains("CambriaApiException")) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } +}