DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / MMServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
31 import com.att.nsa.util.rrConvertor;
32 import org.apache.http.HttpStatus;
33 import org.apache.kafka.clients.producer.ProducerRecord;
34 import org.json.JSONObject;
35 import org.json.JSONTokener;
36 import org.onap.dmaap.dmf.mr.CambriaApiException;
37 import org.onap.dmaap.dmf.mr.backends.Consumer;
38 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
39 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
40 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
41 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
43 import org.onap.dmaap.dmf.mr.beans.LogDetails;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
47 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
49 import org.onap.dmaap.dmf.mr.metabroker.Topic;
50 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
51 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
52 import org.onap.dmaap.dmf.mr.service.MMService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import org.springframework.beans.factory.annotation.Autowired;
57 import org.springframework.beans.factory.annotation.Qualifier;
58 import org.springframework.stereotype.Service;
59
60 import javax.servlet.http.HttpServletRequest;
61 import javax.servlet.http.HttpServletResponse;
62 import javax.ws.rs.core.Context;
63 import java.io.ByteArrayOutputStream;
64 import java.io.IOException;
65 import java.io.InputStream;
66 import java.text.SimpleDateFormat;
67 import java.util.ArrayList;
68 import java.util.Date;
69 import java.util.LinkedList;
70
71
72 @Service
73 public class MMServiceImpl implements MMService {
74         private static final String BATCH_LENGTH = "event.batch.length";
75         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
76         //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
77         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
78         @Autowired
79         private DMaaPErrorMessages errorMessages;
80
81         @Autowired
82         @Qualifier("configurationReader")
83         private ConfigurationReader configReader;
84
85         // HttpServletRequest object
86         @Context
87         private HttpServletRequest request;
88
89         // HttpServletResponse object
90         @Context
91         private HttpServletResponse response;
92
93         @Override
94         public void addWhiteList() {
95
96         }
97
98         @Override
99         public void removeWhiteList() {
100
101         }
102
103         @Override
104         public void listWhiteList() {
105
106         }
107
108         @Override
109         public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
110                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
111                         CambriaApiException, IOException {
112
113                 
114                 final HttpServletRequest req = ctx.getRequest();
115                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
116
117                 // was this host blacklisted?
118                 final String remoteAddr = Utils.getRemoteAddress(ctx);
119                 
120                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
121
122                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
123                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
124                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
125                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
126                                         ctx.getRequest().getRemoteHost(), null, null);
127                         LOG.info(errRes.toString());
128                         throw new CambriaApiException(errRes);
129                 }
130
131                 int limit = CambriaConstants.kNoLimit;
132
133                 if (req.getParameter("limit") != null) {
134                         limit = Integer.parseInt(req.getParameter("limit"));
135                 }
136                 limit = 1;
137                 
138                 int timeoutMs = CambriaConstants.kNoTimeout;
139                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
140                 if (strtimeoutMS != null)
141                         timeoutMs = Integer.parseInt(strtimeoutMS);
142                 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
143                 
144                 if (req.getParameter("timeout") != null) {
145                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
146                 }
147
148                 // By default no filter is applied if filter is not passed as a
149                 // parameter in the request URI
150                 String topicFilter = CambriaConstants.kNoFilter;
151                 if (null != req.getParameter("filter")) {
152                         topicFilter = req.getParameter("filter");
153                 }
154                 // pretty to print the messaages in new line
155                 String prettyval = "0";
156                 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
157                 if (null != strPretty)
158                         prettyval = strPretty;
159
160                 String metaval = "0";
161                 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
162                 if (null != strmeta)
163                         metaval = strmeta;
164
165                 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
166                 // withMeta to print offset along with message
167                 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
168
169                 // is this user allowed to read this topic?
170                 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
171                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
172
173                 if (metatopic == null) {
174                         // no such topic.
175                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
176                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
177                                         errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
178                                         topic, null, null, clientId, ctx.getRequest().getRemoteHost());
179                         LOG.info(errRes.toString());
180                         throw new CambriaApiException(errRes);
181                 }
182                 //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
183                 /*
184                  * if (null==metricTopicname)
185                  * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
186                  * if(null==ctx.getRequest().getHeader("Authorization")&&
187                  * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
188                  * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
189                  * permissions metatopic.checkUserRead(user); } }
190                  */
191
192                 Consumer c = null;
193                 try {
194                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
195
196                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost());
197
198                         final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
199                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
200                         coes.setDmaapContext(ctx);
201                         coes.setTopic(metatopic);
202
203                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
204
205                         try {
206                                 coes.write(baos);
207                         } catch (Exception ex) {
208
209                         }
210
211                         c.commitOffsets();
212                         final int sent = coes.getSentCount();
213
214                         metricsSet.consumeTick(sent);
215
216                 } catch (UnavailableException excp) {
217
218                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
219                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
220                                         errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
221                                         null, null, clientId, ctx.getRequest().getRemoteHost());
222                         LOG.info(errRes.toString());
223                         throw new CambriaApiException(errRes);
224
225                 } catch (CambriaApiException excp) {
226
227                         throw excp;
228                 } catch (Exception excp) {
229
230                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
231
232                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
233                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
234                                         "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
235                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
236                         LOG.info(errRes.toString());
237                         throw new CambriaApiException(errRes);
238                 } finally {
239
240                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
241                         String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
242                                         ConsumerFactory.kSetting_EnableCache);
243                         if (null != strkSetting_EnableCache)
244                                 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
245
246                         if (!kSetting_EnableCache && (c != null)) {
247                                 c.close();
248
249                         }
250                 }
251                 return baos.toString();
252         }
253
254         @Override
255         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
256                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
257                                         CambriaApiException, IOException, missingReqdSetting {
258
259                 //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
260                 //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
261
262                 final String remoteAddr = Utils.getRemoteAddress(ctx);
263
264                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
265
266                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
267                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
268                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
269                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
270                                         ctx.getRequest().getRemoteHost(), null, null);
271                         LOG.info(errRes.toString());
272                         throw new CambriaApiException(errRes);
273                 }
274
275                 String topicNameStd = null;
276
277                 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
278                                 "enforced.topic.name.AAF");
279                 String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
280                                 "metrics.send.cambria.topic");
281                 if (null == metricTopicname)
282                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
283                 boolean topicNameEnforced = false;
284                 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
285                         topicNameEnforced = true;
286                 }
287
288                 final HttpServletRequest req = ctx.getRequest();
289
290                 boolean chunked = false;
291                 if (null != req.getHeader(TRANSFER_ENCODING)) {
292                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
293                 }
294
295                 String mediaType = req.getContentType();
296                 if (mediaType == null || mediaType.length() == 0) {
297                         mediaType = MimeTypes.kAppGenericBinary;
298                 }
299
300                 if (mediaType.contains("charset=UTF-8")) {
301                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
302                 }
303
304                 if (!topic.equalsIgnoreCase(metricTopicname)) {
305                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
306                 } else {
307                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
308                 }
309         }
310
311         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
312                         final String messageCreationTime, final int messageSequence, final Long batchId,
313                         final boolean transactionEnabled) {
314                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
315                                 transactionEnabled);
316                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
317                 msg.setTransactionEnabled(transactionEnabled);
318                 msg.setLogDetails(logDetails);
319         }
320
321         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
322                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
323                 LogDetails logDetails = new LogDetails();
324                 logDetails.setTopicId(topicName);
325                 logDetails.setMessageTimestamp(messageTimestamp);
326                 logDetails.setPublisherId(Utils.getUserApiKey(request));
327                 logDetails.setPublisherIp(request.getRemoteHost());
328                 logDetails.setMessageBatchId(batchId);
329                 logDetails.setMessageSequence(String.valueOf(messageSequence));
330                 logDetails.setTransactionEnabled(transactionEnabled);
331                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
332                 logDetails.setServerIp(request.getLocalAddr());
333                 return logDetails;
334         }
335
336         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
337                         String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
338                                         CambriaApiException, IOException {
339                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
340
341                 // setup the event set
342                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
343
344                 // start processing, building a batch to push to the backend
345                 final long startMs = System.currentTimeMillis();
346                 long count = 0;
347
348                 long maxEventBatch = 1024L * 16;
349                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
350                 if (null != batchlen)
351                         maxEventBatch = Long.parseLong(batchlen);
352
353                 // long maxEventBatch =
354                 // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
355                 final LinkedList<message> batch = new LinkedList<message>();
356                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
357                 //final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
358
359                 try {
360                         // for each message...
361                         message m = null;
362                         while ((m = events.next()) != null) {
363                                 // add the message to the batch
364                                 batch.add(m);
365                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
366                                                 m.getMessage());
367                                 // check if the batch is full
368                                 final int sizeNow = batch.size();
369                                 if (sizeNow > maxEventBatch) {
370                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
371                                         pms.clear();
372                                         batch.clear();
373                                         metricsSet.publishTick(sizeNow);
374                                         count += sizeNow;
375                                 }
376                         }
377
378                         // send the pending batch
379                         final int sizeNow = batch.size();
380                         if (sizeNow > 0) {
381                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
382                                 pms.clear();
383                                 batch.clear();
384                                 metricsSet.publishTick(sizeNow);
385                                 count += sizeNow;
386                         }
387
388                         final long endMs = System.currentTimeMillis();
389                         final long totalMs = endMs - startMs;
390
391                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
392
393                         // build a responseP
394                         final JSONObject response = new JSONObject();
395                         response.put("count", count);
396                         response.put("serverTimeMs", totalMs);
397                         // DMaaPResponseBuilder.respondOk(ctx, response);
398
399                 } catch (Exception excp) {
400
401                         int status = HttpStatus.SC_NOT_FOUND;
402                         String errorMsg = null;
403                         if (excp.getClass().toString().contains("CambriaApiException")) {
404                                 status = ((CambriaApiException) excp).getStatus();
405                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
406                                 JSONObject errObject = new JSONObject(jsonTokener);
407                                 errorMsg = (String) errObject.get("message");
408
409                         }
410                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
411                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
412                                                         + "." + errorMsg,
413                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
414                                         null);
415                         LOG.info(errRes.toString());
416                         throw new CambriaApiException(errRes);
417
418                 }
419         }
420
421         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
422                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
423                                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
424                                         CambriaApiException {
425
426                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
427
428                 // setup the event set
429                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
430
431                 // start processing, building a batch to push to the backend
432                 final long startMs = System.currentTimeMillis();
433                 long count = 0;
434         long maxEventBatch = 1024L * 16L;
435                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
436                 if (null != evenlen)
437                         maxEventBatch = Long.parseLong(evenlen);
438
439                 final LinkedList<message> batch = new LinkedList<message>();
440                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
441
442                 message m = null;
443                 int messageSequence = 1;
444                 Long batchId = 1L;
445                 final boolean transactionEnabled = true;
446                 int publishBatchCount = 0;
447                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
448
449                 // LOG.warn("Batch Start Id: " +
450                 // Utils.getFromattedBatchSequenceId(batchId));
451                 try {
452                         // for each message...
453                         batchId = DMaaPContext.getBatchID();
454
455                         String responseTransactionId = null;
456
457                         while ((m = events.next()) != null) {
458
459                                 // LOG.warn("Batch Start Id: " +
460                                 // Utils.getFromattedBatchSequenceId(batchId));
461
462                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
463                                                 transactionEnabled);
464                                 messageSequence++;
465
466                                 // add the message to the batch
467                                 batch.add(m);
468
469                                 responseTransactionId = m.getLogDetails().getTransactionId();
470
471                                 JSONObject jsonObject = new JSONObject();
472                                 jsonObject.put("message", m.getMessage());
473                                 jsonObject.put("transactionId", responseTransactionId);
474                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
475                                                 m.getMessage());
476                                 pms.add(data);
477
478                                 // check if the batch is full
479                                 final int sizeNow = batch.size();
480                                 if (sizeNow >= maxEventBatch) {
481                                         String startTime = sdf.format(new Date());
482                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
483                                                         + batchId + "]");
484                                         try {
485                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
486                                                 // transactionLogs(batch);
487                                                 for (message msg : batch) {
488                                                         LogDetails logDetails = msg.getLogDetails();
489                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
490                                                 }
491                                         } catch (Exception excp) {
492
493                                                 int status = HttpStatus.SC_NOT_FOUND;
494                                                 String errorMsg = null;
495                                                 if (excp.getClass().toString().contains("CambriaApiException")) {
496                                                         status = ((CambriaApiException) excp).getStatus();
497                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
498                                                         JSONObject errObject = new JSONObject(jsonTokener);
499                                                         errorMsg = (String) errObject.get("message");
500                                                 }
501                                                 ErrorResponse errRes = new ErrorResponse(status,
502                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
503                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
504                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
505                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
506                                                                 ctx.getRequest().getRemoteHost(), null, null);
507                                                 LOG.info(errRes.toString());
508                                                 throw new CambriaApiException(errRes);
509                                         }
510                                         pms.clear();
511                                         batch.clear();
512                                         metricsSet.publishTick(sizeNow);
513                                         publishBatchCount = sizeNow;
514                                         count += sizeNow;
515                                         // batchId++;
516                                         String endTime = sdf.format(new Date());
517                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
518                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
519                                                         + ",Batch End Time=" + endTime + "]");
520                                         batchId = DMaaPContext.getBatchID();
521                                 }
522                         }
523
524                         // send the pending batch
525                         final int sizeNow = batch.size();
526                         if (sizeNow > 0) {
527                                 String startTime = sdf.format(new Date());
528                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
529                                                 + batchId + "]");
530                                 try {
531                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
532                                         // transactionLogs(batch);
533                                         for (message msg : batch) {
534                                                 LogDetails logDetails = msg.getLogDetails();
535                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
536                                         }
537                                 } catch (Exception excp) {
538                                         int status = HttpStatus.SC_NOT_FOUND;
539                                         String errorMsg = null;
540                                         if (excp.getClass().toString().contains("CambriaApiException")) {
541                                                 status = ((CambriaApiException) excp).getStatus();
542                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
543                                                 JSONObject errObject = new JSONObject(jsonTokener);
544                                                 errorMsg = (String) errObject.get("message");
545                                         }
546
547                                         ErrorResponse errRes = new ErrorResponse(status,
548                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
549                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
550                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
551                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
552                                                         ctx.getRequest().getRemoteHost(), null, null);
553                                         LOG.info(errRes.toString());
554                                         throw new CambriaApiException(errRes);
555                                 }
556                                 pms.clear();
557                                 metricsSet.publishTick(sizeNow);
558                                 count += sizeNow;
559                                 // batchId++;
560                                 String endTime = sdf.format(new Date());
561                                 publishBatchCount = sizeNow;
562                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
563                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
564                                                 + endTime + "]");
565                         }
566
567                         final long endMs = System.currentTimeMillis();
568                         final long totalMs = endMs - startMs;
569
570                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
571
572                         // build a response
573                         final JSONObject response = new JSONObject();
574                         response.put("count", count);
575                         response.put("serverTimeMs", totalMs);
576
577                 } catch (Exception excp) {
578                         int status = HttpStatus.SC_NOT_FOUND;
579                         String errorMsg = null;
580                         if (excp.getClass().toString().contains("CambriaApiException")) {
581                                 status = ((CambriaApiException) excp).getStatus();
582                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
583                                 JSONObject errObject = new JSONObject(jsonTokener);
584                                 errorMsg = (String) errObject.get("message");
585                         }
586
587                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
588                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
589                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
590                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
591                                         ctx.getRequest().getRemoteHost(), null, null);
592                         LOG.info(errRes.toString());
593                         throw new CambriaApiException(errRes);
594                 }
595         }
596 }