bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / service / impl / MMServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.service.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.text.SimpleDateFormat;
28 import java.util.ArrayList;
29 import java.util.Date;
30 import java.util.LinkedList;
31
32 import javax.servlet.http.HttpServletRequest;
33 import javax.servlet.http.HttpServletResponse;
34 import javax.ws.rs.core.Context;
35
36 import org.apache.http.HttpStatus;
37 import org.apache.kafka.clients.producer.ProducerRecord;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.beans.factory.annotation.Qualifier;
42 import org.springframework.stereotype.Service;
43
44 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
45 import com.att.dmf.mr.CambriaApiException;
46 import com.att.dmf.mr.backends.Consumer;
47 import com.att.dmf.mr.backends.ConsumerFactory;
48 import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException;
49 import com.att.dmf.mr.backends.MetricsSet;
50 import com.att.dmf.mr.backends.Publisher;
51 import com.att.dmf.mr.backends.Publisher.message;
52 import com.att.dmf.mr.beans.DMaaPContext;
53 import com.att.dmf.mr.beans.LogDetails;
54 import com.att.dmf.mr.constants.CambriaConstants;
55 import com.att.dmf.mr.exception.DMaaPErrorMessages;
56 import com.att.dmf.mr.exception.DMaaPResponseCode;
57 import com.att.dmf.mr.exception.ErrorResponse;
58 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
59 import com.att.dmf.mr.metabroker.Topic;
60 import com.att.dmf.mr.resources.CambriaEventSet;
61 import com.att.dmf.mr.resources.CambriaOutboundEventStream;
62 import com.att.dmf.mr.service.MMService;
63 import com.att.dmf.mr.utils.ConfigurationReader;
64 import com.att.dmf.mr.utils.DMaaPResponseBuilder;
65 import com.att.dmf.mr.utils.Utils;
66 import com.att.eelf.configuration.EELFLogger;
67 import com.att.eelf.configuration.EELFManager;
68 import com.att.nsa.configs.ConfigDbException;
69 import com.att.nsa.drumlin.service.standards.MimeTypes;
70 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
71 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
72 import com.att.nsa.util.rrConvertor;
73
74
75
76 @Service
77 public class MMServiceImpl implements MMService {
78         private static final String BATCH_LENGTH = "event.batch.length";
79         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
80         //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
81         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
82         @Autowired
83         private DMaaPErrorMessages errorMessages;
84
85         @Autowired
86         @Qualifier("configurationReader")
87         private ConfigurationReader configReader;
88
89         // HttpServletRequest object
90         @Context
91         private HttpServletRequest request;
92
93         // HttpServletResponse object
94         @Context
95         private HttpServletResponse response;
96
97         @Override
98         public void addWhiteList() {
99
100         }
101
102         @Override
103         public void removeWhiteList() {
104
105         }
106
107         @Override
108         public void listWhiteList() {
109
110         }
111
112         @Override
113         public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
114                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
115                         CambriaApiException, IOException {
116
117                 
118                 final HttpServletRequest req = ctx.getRequest();
119                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
120
121                 // was this host blacklisted?
122                 final String remoteAddr = Utils.getRemoteAddress(ctx);
123                 
124                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
125
126                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
127                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
128                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
129                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
130                                         ctx.getRequest().getRemoteHost(), null, null);
131                         LOG.info(errRes.toString());
132                         throw new CambriaApiException(errRes);
133                 }
134
135                 int limit = CambriaConstants.kNoLimit;
136
137                 if (req.getParameter("limit") != null) {
138                         limit = Integer.parseInt(req.getParameter("limit"));
139                 }
140                 limit = 1;
141                 
142                 int timeoutMs = CambriaConstants.kNoTimeout;
143                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
144                 if (strtimeoutMS != null)
145                         timeoutMs = Integer.parseInt(strtimeoutMS);
146                 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
147                 
148                 if (req.getParameter("timeout") != null) {
149                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
150                 }
151
152                 // By default no filter is applied if filter is not passed as a
153                 // parameter in the request URI
154                 String topicFilter = CambriaConstants.kNoFilter;
155                 if (null != req.getParameter("filter")) {
156                         topicFilter = req.getParameter("filter");
157                 }
158                 // pretty to print the messaages in new line
159                 String prettyval = "0";
160                 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
161                 if (null != strPretty)
162                         prettyval = strPretty;
163
164                 String metaval = "0";
165                 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
166                 if (null != strmeta)
167                         metaval = strmeta;
168
169                 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
170                 // withMeta to print offset along with message
171                 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
172
173                 // is this user allowed to read this topic?
174                 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
175                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
176
177                 if (metatopic == null) {
178                         // no such topic.
179                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
180                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
181                                         errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
182                                         topic, null, null, clientId, ctx.getRequest().getRemoteHost());
183                         LOG.info(errRes.toString());
184                         throw new CambriaApiException(errRes);
185                 }
186                 //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
187                 /*
188                  * if (null==metricTopicname)
189                  * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
190                  * if(null==ctx.getRequest().getHeader("Authorization")&&
191                  * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
192                  * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
193                  * permissions metatopic.checkUserRead(user); } }
194                  */
195
196                 Consumer c = null;
197                 try {
198                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
199
200                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost());
201
202                         final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
203                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
204                         coes.setDmaapContext(ctx);
205                         coes.setTopic(metatopic);
206
207                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
208
209                         try {
210                                 coes.write(baos);
211                         } catch (Exception ex) {
212
213                         }
214
215                         c.commitOffsets();
216                         final int sent = coes.getSentCount();
217
218                         metricsSet.consumeTick(sent);
219
220                 } catch (UnavailableException excp) {
221
222                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
223                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
224                                         errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
225                                         null, null, clientId, ctx.getRequest().getRemoteHost());
226                         LOG.info(errRes.toString());
227                         throw new CambriaApiException(errRes);
228
229                 } catch (CambriaApiException excp) {
230
231                         throw excp;
232                 } catch (Exception excp) {
233
234                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
235
236                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
237                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
238                                         "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
239                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
240                         LOG.info(errRes.toString());
241                         throw new CambriaApiException(errRes);
242                 } finally {
243
244                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
245                         String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246                                         ConsumerFactory.kSetting_EnableCache);
247                         if (null != strkSetting_EnableCache)
248                                 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
249
250                         if (!kSetting_EnableCache && (c != null)) {
251                                 c.close();
252
253                         }
254                 }
255                 return baos.toString();
256         }
257
258         @Override
259         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
260                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
261                                         CambriaApiException, IOException, missingReqdSetting {
262
263                 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
264                 //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
265
266                 final String remoteAddr = Utils.getRemoteAddress(ctx);
267
268                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
269
270                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
271                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
272                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
273                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
274                                         ctx.getRequest().getRemoteHost(), null, null);
275                         LOG.info(errRes.toString());
276                         throw new CambriaApiException(errRes);
277                 }
278
279                 String topicNameStd = null;
280
281                 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
282                                 "enforced.topic.name.AAF");
283                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
284                                 "metrics.send.cambria.topic");
285                 if (null == metricTopicname)
286                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
287                 boolean topicNameEnforced = false;
288                 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
289                         topicNameEnforced = true;
290                 }
291
292                 final HttpServletRequest req = ctx.getRequest();
293
294                 boolean chunked = false;
295                 if (null != req.getHeader(TRANSFER_ENCODING)) {
296                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
297                 }
298
299                 String mediaType = req.getContentType();
300                 if (mediaType == null || mediaType.length() == 0) {
301                         mediaType = MimeTypes.kAppGenericBinary;
302                 }
303
304                 if (mediaType.contains("charset=UTF-8")) {
305                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
306                 }
307
308                 if (!topic.equalsIgnoreCase(metricTopicname)) {
309                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
310                 } else {
311                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
312                 }
313         }
314
315         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
316                         final String messageCreationTime, final int messageSequence, final Long batchId,
317                         final boolean transactionEnabled) {
318                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
319                                 transactionEnabled);
320                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
321                 msg.setTransactionEnabled(transactionEnabled);
322                 msg.setLogDetails(logDetails);
323         }
324
325         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
326                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
327                 LogDetails logDetails = new LogDetails();
328                 logDetails.setTopicId(topicName);
329                 logDetails.setMessageTimestamp(messageTimestamp);
330                 logDetails.setPublisherId(Utils.getUserApiKey(request));
331                 logDetails.setPublisherIp(request.getRemoteHost());
332                 logDetails.setMessageBatchId(batchId);
333                 logDetails.setMessageSequence(String.valueOf(messageSequence));
334                 logDetails.setTransactionEnabled(transactionEnabled);
335                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
336                 logDetails.setServerIp(request.getLocalAddr());
337                 return logDetails;
338         }
339
340         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
341                         String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
342                                         CambriaApiException, IOException {
343                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
344
345                 // setup the event set
346                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
347
348                 // start processing, building a batch to push to the backend
349                 final long startMs = System.currentTimeMillis();
350                 long count = 0;
351
352                 long maxEventBatch = 1024 * 16;
353                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
354                 if (null != batchlen)
355                         maxEventBatch = Long.parseLong(batchlen);
356
357                 // long maxEventBatch =
358                 // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
359                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
360                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
361                 //final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
362
363                 try {
364                         // for each message...
365                         Publisher.message m = null;
366                         while ((m = events.next()) != null) {
367                                 // add the message to the batch
368                                 batch.add(m);
369                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
370                                                 m.getMessage());
371                                 // check if the batch is full
372                                 final int sizeNow = batch.size();
373                                 if (sizeNow > maxEventBatch) {
374                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
375                                         pms.clear();
376                                         batch.clear();
377                                         metricsSet.publishTick(sizeNow);
378                                         count += sizeNow;
379                                 }
380                         }
381
382                         // send the pending batch
383                         final int sizeNow = batch.size();
384                         if (sizeNow > 0) {
385                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
386                                 pms.clear();
387                                 batch.clear();
388                                 metricsSet.publishTick(sizeNow);
389                                 count += sizeNow;
390                         }
391
392                         final long endMs = System.currentTimeMillis();
393                         final long totalMs = endMs - startMs;
394
395                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
396
397                         // build a responseP
398                         final JSONObject response = new JSONObject();
399                         response.put("count", count);
400                         response.put("serverTimeMs", totalMs);
401                         // DMaaPResponseBuilder.respondOk(ctx, response);
402
403                 } catch (Exception excp) {
404
405                         int status = HttpStatus.SC_NOT_FOUND;
406                         String errorMsg = null;
407                         if (excp.getClass().toString().contains("CambriaApiException")) {
408                                 status = ((CambriaApiException) excp).getStatus();
409                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
410                                 JSONObject errObject = new JSONObject(jsonTokener);
411                                 errorMsg = (String) errObject.get("message");
412
413                         }
414                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
415                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
416                                                         + "." + errorMsg,
417                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
418                                         null);
419                         LOG.info(errRes.toString());
420                         throw new CambriaApiException(errRes);
421
422                 }
423         }
424
425         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
426                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
427                                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
428                                         CambriaApiException {
429
430                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
431
432                 // setup the event set
433                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
434
435                 // start processing, building a batch to push to the backend
436                 final long startMs = System.currentTimeMillis();
437                 long count = 0;
438                 long maxEventBatch = 1024 * 16;
439                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
440                 if (null != evenlen)
441                         maxEventBatch = Long.parseLong(evenlen);
442
443                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
444                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
445
446                 Publisher.message m = null;
447                 int messageSequence = 1;
448                 Long batchId = 1L;
449                 final boolean transactionEnabled = true;
450                 int publishBatchCount = 0;
451                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
452
453                 // LOG.warn("Batch Start Id: " +
454                 // Utils.getFromattedBatchSequenceId(batchId));
455                 try {
456                         // for each message...
457                         batchId = DMaaPContext.getBatchID();
458
459                         String responseTransactionId = null;
460
461                         while ((m = events.next()) != null) {
462
463                                 // LOG.warn("Batch Start Id: " +
464                                 // Utils.getFromattedBatchSequenceId(batchId));
465
466                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
467                                                 transactionEnabled);
468                                 messageSequence++;
469
470                                 // add the message to the batch
471                                 batch.add(m);
472
473                                 responseTransactionId = m.getLogDetails().getTransactionId();
474
475                                 JSONObject jsonObject = new JSONObject();
476                                 jsonObject.put("message", m.getMessage());
477                                 jsonObject.put("transactionId", responseTransactionId);
478                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
479                                                 m.getMessage());
480                                 pms.add(data);
481
482                                 // check if the batch is full
483                                 final int sizeNow = batch.size();
484                                 if (sizeNow >= maxEventBatch) {
485                                         String startTime = sdf.format(new Date());
486                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
487                                                         + batchId + "]");
488                                         try {
489                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
490                                                 // transactionLogs(batch);
491                                                 for (message msg : batch) {
492                                                         LogDetails logDetails = msg.getLogDetails();
493                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
494                                                 }
495                                         } catch (Exception excp) {
496
497                                                 int status = HttpStatus.SC_NOT_FOUND;
498                                                 String errorMsg = null;
499                                                 if (excp.getClass().toString().contains("CambriaApiException")) {
500                                                         status = ((CambriaApiException) excp).getStatus();
501                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
502                                                         JSONObject errObject = new JSONObject(jsonTokener);
503                                                         errorMsg = (String) errObject.get("message");
504                                                 }
505                                                 ErrorResponse errRes = new ErrorResponse(status,
506                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
507                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
508                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
509                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
510                                                                 ctx.getRequest().getRemoteHost(), null, null);
511                                                 LOG.info(errRes.toString());
512                                                 throw new CambriaApiException(errRes);
513                                         }
514                                         pms.clear();
515                                         batch.clear();
516                                         metricsSet.publishTick(sizeNow);
517                                         publishBatchCount = sizeNow;
518                                         count += sizeNow;
519                                         // batchId++;
520                                         String endTime = sdf.format(new Date());
521                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
522                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
523                                                         + ",Batch End Time=" + endTime + "]");
524                                         batchId = DMaaPContext.getBatchID();
525                                 }
526                         }
527
528                         // send the pending batch
529                         final int sizeNow = batch.size();
530                         if (sizeNow > 0) {
531                                 String startTime = sdf.format(new Date());
532                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
533                                                 + batchId + "]");
534                                 try {
535                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
536                                         // transactionLogs(batch);
537                                         for (message msg : batch) {
538                                                 LogDetails logDetails = msg.getLogDetails();
539                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
540                                         }
541                                 } catch (Exception excp) {
542                                         int status = HttpStatus.SC_NOT_FOUND;
543                                         String errorMsg = null;
544                                         if (excp.getClass().toString().contains("CambriaApiException")) {
545                                                 status = ((CambriaApiException) excp).getStatus();
546                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
547                                                 JSONObject errObject = new JSONObject(jsonTokener);
548                                                 errorMsg = (String) errObject.get("message");
549                                         }
550
551                                         ErrorResponse errRes = new ErrorResponse(status,
552                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
553                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
554                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
555                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
556                                                         ctx.getRequest().getRemoteHost(), null, null);
557                                         LOG.info(errRes.toString());
558                                         throw new CambriaApiException(errRes);
559                                 }
560                                 pms.clear();
561                                 metricsSet.publishTick(sizeNow);
562                                 count += sizeNow;
563                                 // batchId++;
564                                 String endTime = sdf.format(new Date());
565                                 publishBatchCount = sizeNow;
566                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
567                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
568                                                 + endTime + "]");
569                         }
570
571                         final long endMs = System.currentTimeMillis();
572                         final long totalMs = endMs - startMs;
573
574                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
575
576                         // build a response
577                         final JSONObject response = new JSONObject();
578                         response.put("count", count);
579                         response.put("serverTimeMs", totalMs);
580
581                 } catch (Exception excp) {
582                         int status = HttpStatus.SC_NOT_FOUND;
583                         String errorMsg = null;
584                         if (excp.getClass().toString().contains("CambriaApiException")) {
585                                 status = ((CambriaApiException) excp).getStatus();
586                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
587                                 JSONObject errObject = new JSONObject(jsonTokener);
588                                 errorMsg = (String) errObject.get("message");
589                         }
590
591                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
592                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
593                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
594                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
595                                         ctx.getRequest().getRemoteHost(), null, null);
596                         LOG.info(errRes.toString());
597                         throw new CambriaApiException(errRes);
598                 }
599         }
600 }