ec5bfc09e0493c300cb2bda4336453172cf10710
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.math.NumberUtils;
45 import org.apache.http.HttpStatus;
46 import org.apache.kafka.clients.producer.ProducerRecord;
47 import org.apache.kafka.common.errors.TopicExistsException;
48 import org.json.JSONObject;
49 import org.json.JSONTokener;
50 import org.onap.dmaap.dmf.mr.CambriaApiException;
51 import org.onap.dmaap.dmf.mr.backends.Consumer;
52 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
54 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
55 import org.onap.dmaap.dmf.mr.backends.Publisher;
56 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
57 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
59 import org.onap.dmaap.dmf.mr.beans.LogDetails;
60 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
61 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
64 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
65 import org.onap.dmaap.dmf.mr.metabroker.Topic;
66 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
67 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
68 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.service.EventsService;
72 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
74 import org.onap.dmaap.dmf.mr.utils.Utils;
75 import org.springframework.beans.factory.annotation.Autowired;
76 import org.springframework.stereotype.Service;
77
78 /**
79  * This class provides the functinality to publish and subscribe message to
80  * kafka
81  * 
82  * @author Ramkumar Sembaiyam
83  *
84  */
85 @Service
86 public class EventsServiceImpl implements EventsService {
87         
88         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89         private static final String BATCH_LENGTH = "event.batch.length";
90         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
91         private static final String TIMEOUT_PROPERTY = "timeout";
92         private static final String SUBSCRIBE_ACTION = "sub";
93         private static final String PUBLISH_ACTION = "pub";
94
95         @Autowired
96         private DMaaPErrorMessages errorMessages;
97
98         String getPropertyFromAJSCmap(String propertyKey) {
99                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
100         }
101
102         public DMaaPErrorMessages getErrorMessages() {
103                 return errorMessages;
104         }
105
106         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
107                 this.errorMessages = errorMessages;
108         }
109
110         /**
111          * @param ctx
112          * @param topic
113          * @param consumerGroup
114          * @param clientId
115          * @throws ConfigDbException,
116          *             TopicExistsException, AccessDeniedException,
117          *             UnavailableException, CambriaApiException, IOException
118          * 
119          * 
120          */
121         @Override
122         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
123                         throws ConfigDbException, AccessDeniedException, UnavailableException,
124                         CambriaApiException, IOException {
125
126                 final long startTime = System.currentTimeMillis();
127                 final HttpServletRequest req = ctx.getRequest();
128                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
129                 final String remoteHost = req.getRemoteHost();
130                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
131                         .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
132
133                 validateIpBlacklist(errRespProvider, ctx);
134
135                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
136                 if (metaTopic == null) {
137                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
138                 }
139
140                 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
141
142                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
143                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
144                                 + " " + clientId);
145
146                 verifyHostId();
147                 final boolean pretty = isPrettyPrintEnabled();
148                 final boolean withMeta = isMetaOffsetEnabled();
149                 int timeoutMs = getMessageTimeout(req);
150                 int limit = getMessageLimit(req);
151                 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
152                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
153
154                 Consumer consumer = null;
155                 try {
156                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
157                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
158                         rl.onCall(topic, consumerGroup, clientId, remoteHost);
159                         consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
160                                         remoteHost);
161                         CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
162                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
163                         coes.setDmaapContext(ctx);
164                         coes.setTopic(metaTopic);
165                         coes.setTransEnabled(isTransEnabled() || isAAFTopic);
166                         coes.setTopicStyle(isAAFTopic);
167                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
168                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
169                                         + consumerGroup + " " + clientId);
170
171                         respondOkWithStream(ctx, coes);
172                         // No IOException thrown during respondOkWithStream, so commit the
173                         // new offsets to all the brokers
174                         consumer.commitOffsets();
175                         final int sent = coes.getSentCount();
176                         metricsSet.consumeTick(sent);
177                         rl.onSend(topic, consumerGroup, clientId, sent);
178                         final long elapsedMs = System.currentTimeMillis() - startTime;
179                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
180                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
181                                         + remoteHost);
182
183                 } catch (UnavailableException excp) {
184                         logger.warn(excp.getMessage(), excp);
185                         ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
186                         LOG.info(errRes.toString());
187                         throw new CambriaApiException(errRes);
188
189                 } catch (ConcurrentModificationException excp1) {
190                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
191                         ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
192                         logger.info(errRes.toString());
193                         throw new CambriaApiException(errRes);
194                         
195                 } catch (Exception excp) {
196                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
197                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
198                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
199                         ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
200                         logger.info(errRes.toString());
201                         throw new CambriaApiException(errRes);
202                 } finally {
203                         if (consumer != null && !isCacheEnabled()) {
204                                 try {
205                                         consumer.close();
206                                 } catch (Exception e) {
207                                         logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
208                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
209                                                         + " " + e);
210                                 }
211                         }
212                 }
213         }
214
215         private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
216                 final String remoteAddr = Utils.getRemoteAddress(ctx);
217                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
218                         ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
219                         LOG.info(errRes.toString());
220                         throw new CambriaApiException(errRes);
221                 }
222         }
223
224         private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
225                 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
226
227                 boolean isAAFTopic = false;
228                 String metricTopicName = getMetricTopicName();
229                 if(!metricTopicName.equalsIgnoreCase(topicName)) {
230                         if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
231                                 isAAFTopic = true;
232                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
233                                 String permission = aaf.aafPermissionString(topicName, action);
234                                 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
235                                         ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
236                                         LOG.info(errRes.toString());
237                                         throw new DMaaPAccessDeniedException(errRes);
238
239                                 }
240                         } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
241                                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
242                                 if(SUBSCRIBE_ACTION.equals(action)) {
243                                         metaTopic.checkUserRead(user);
244                                 } else if(PUBLISH_ACTION.equals(action)) {
245                                         metaTopic.checkUserWrite(user);
246                                 }
247                         }
248                 }
249                 return isAAFTopic;
250         }
251
252         boolean isCadiEnabled() {
253                 return Utils.isCadiEnabled();
254         }
255
256         void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
257                 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
258                 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
259         }
260
261         private int getMessageLimit(HttpServletRequest request) {
262                 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
263         }
264
265         private int getMessageTimeout(HttpServletRequest request) {
266                 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
267                 int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
268                         CambriaConstants.kNoTimeout;
269
270                 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
271                 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
272         }
273
274         private boolean isPrettyPrintEnabled() {
275                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
276         }
277
278         private boolean isMetaOffsetEnabled() {
279                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
280         }
281
282         private boolean isTopicNameEnforcedAaf(String topicName) {
283                 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
284                 return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
285         }
286
287         private boolean isCacheEnabled() {
288                 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
289                 return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
290         }
291
292         private void verifyHostId() {
293                 String lhostId = getPropertyFromAJSCmap("clusterhostid");
294                 if (lhostId.isEmpty()) {
295                         try {
296                                 InetAddress.getLocalHost().getCanonicalHostName();
297                         } catch (UnknownHostException e) {
298                                 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
299                         }
300
301                 }
302         }
303
304         private String getMetricTopicName() {
305                 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
306                 return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
307         }
308
309         /**
310          * @throws missingReqdSetting
311          * 
312          */
313         @Override
314         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
315                         final String requestTime) throws ConfigDbException, AccessDeniedException,
316                         CambriaApiException, IOException, missingReqdSetting {
317
318                 final long startMs = System.currentTimeMillis();
319                 String remoteHost = ctx.getRequest().getRemoteHost();
320                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
321                         .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
322                         .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
323
324                 validateIpBlacklist(errRespProvider, ctx);
325
326                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
327                 if (metaTopic == null) {
328                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
329                 }
330
331                 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
332
333                 final HttpServletRequest req = ctx.getRequest();
334                 boolean chunked = isRequestedChunk(req);
335                 String mediaType = getMediaType(req);
336                 boolean transactionRequired = isTransactionIdRequired();
337
338                 if (isAAFTopic || transactionRequired) {
339                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
340                 } else {
341                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
342                 }
343
344                 final long endMs = System.currentTimeMillis();
345                 final long totalMs = endMs - startMs;
346                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
347         }
348
349         private boolean isRequestedChunk(HttpServletRequest request) {
350                 return null != request.getHeader(TRANSFER_ENCODING) &&
351                         request.getHeader(TRANSFER_ENCODING).contains("chunked");
352         }
353
354         private String getMediaType(HttpServletRequest request) {
355                 String mediaType = request.getContentType();
356                 if (mediaType == null || mediaType.length() == 0) {
357                         return MimeTypes.kAppGenericBinary;
358                 }
359                 return mediaType.replace("; charset=UTF-8", "").trim();
360         }
361
362         private boolean isTransactionIdRequired() {
363                 return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
364         }
365
366         /**
367          * 
368          * @param ctx
369          * @param topic
370          * @param msg
371          * @param defaultPartition
372          * @param chunked
373          * @param mediaType
374          * @throws ConfigDbException
375          * @throws AccessDeniedException
376          * @throws TopicExistsException
377          * @throws CambriaApiException
378          * @throws IOException
379          */
380         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
381                         String mediaType)
382                         throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
383                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
384                 // setup the event set
385                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
386
387                 // start processing, building a batch to push to the backend
388                 final long startMs = System.currentTimeMillis();
389                 long count = 0;
390                 long maxEventBatch = 1024L* 16;
391                 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
392                 if (null != batchlen && !batchlen.isEmpty())
393                         maxEventBatch = Long.parseLong(batchlen);
394                 // long maxEventBatch =
395                 
396                 final LinkedList<Publisher.message> batch = new LinkedList<>();
397                 // final ArrayList<KeyedMessage<String, String>> kms = new
398         
399                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
400                 try {
401                         // for each message...
402                         Publisher.message m = null;
403                         while ((m = events.next()) != null) {
404                                 // add the message to the batch
405                                 batch.add(m);
406                                 // final KeyedMessage<String, String> data = new
407                                 // KeyedMessage<String, String>(topic, m.getKey(),
408                         
409                                 // kms.add(data);
410                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
411                                                 m.getMessage());
412
413                                 pms.add(data);
414                                 // check if the batch is full
415                                 final int sizeNow = batch.size();
416                                 if (sizeNow > maxEventBatch) {
417                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
418                                 
419                                         // kms.clear();
420                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
421                                         pms.clear();
422                                         batch.clear();
423                                         metricsSet.publishTick(sizeNow);
424                                         count += sizeNow;
425                                 }
426                         }
427
428                         // send the pending batch
429                         final int sizeNow = batch.size();
430                         if (sizeNow > 0) {
431                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
432                         
433                                 // kms.clear();
434                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
435                                 pms.clear();
436                                 batch.clear();
437                                 metricsSet.publishTick(sizeNow);
438                                 count += sizeNow;
439                         }
440
441                         final long endMs = System.currentTimeMillis();
442                         final long totalMs = endMs - startMs;
443
444                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
445                                         + ctx.getRequest().getRemoteHost());
446
447                         // build a responseP
448                         final JSONObject response = new JSONObject();
449                         response.put("count", count);
450                         response.put("serverTimeMs", totalMs);
451                         respondOk(ctx, response);
452
453                 } catch (Exception excp) {
454                         int status = HttpStatus.SC_NOT_FOUND;
455                         String errorMsg = null;
456                         if (excp instanceof CambriaApiException) {
457                                 status = ((CambriaApiException) excp).getStatus();
458                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
459                                 JSONObject errObject = new JSONObject(jsonTokener);
460                                 errorMsg = (String) errObject.get("message");
461
462                         }
463                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
464                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
465                                                         + "." + errorMsg,
466                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
467                                         null);
468                         LOG.info(errRes.toString());
469                         throw new CambriaApiException(errRes);
470
471                 }
472         }
473
474         /**
475          * 
476          * @param ctx
477          * @param inputStream
478          * @param topic
479          * @param partitionKey
480          * @param requestTime
481          * @param chunked
482          * @param mediaType
483          * @throws ConfigDbException
484          * @throws AccessDeniedException
485          * @throws TopicExistsException
486          * @throws IOException
487          * @throws CambriaApiException
488          */
489         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
490                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
491                         throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
492
493                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
494
495                 // setup the event set
496                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
497
498                 // start processing, building a batch to push to the backend
499                 final long startMs = System.currentTimeMillis();
500                 long count = 0;
501                 long maxEventBatch = 1024L * 16;
502                 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
503                 if (null != evenlen && !evenlen.isEmpty())
504                         maxEventBatch = Long.parseLong(evenlen);
505                 // final long maxEventBatch =
506                 
507                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
508                 // final ArrayList<KeyedMessage<String, String>> kms = new
509                 
510                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
511                 Publisher.message m = null;
512                 int messageSequence = 1;
513                 Long batchId = 1L;
514                 final boolean transactionEnabled = true;
515                 int publishBatchCount = 0;
516                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
517
518                 // LOG.warn("Batch Start Id: " +
519         
520                 try {
521                         // for each message...
522                         batchId = DMaaPContext.getBatchID();
523
524                         String responseTransactionId = null;
525
526                         while ((m = events.next()) != null) {
527
528                                 // LOG.warn("Batch Start Id: " +
529                                 
530
531                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
532                                                 transactionEnabled);
533                                 messageSequence++;
534
535                         
536                                 batch.add(m);
537
538                                 responseTransactionId = m.getLogDetails().getTransactionId();
539
540                                 //JSONObject jsonObject = new JSONObject();
541                                 //jsonObject.put("msgWrapMR", m.getMessage());
542                                 //jsonObject.put("transactionId", responseTransactionId);
543                                 // final KeyedMessage<String, String> data = new
544                                 // KeyedMessage<String, String>(topic, m.getKey(),
545                         
546                                 
547                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
548                                                 m.getMessage());
549
550                                 pms.add(data);
551                                 // check if the batch is full
552                                 final int sizeNow = batch.size();
553                                 if (sizeNow >= maxEventBatch) {
554                                         String startTime = sdf.format(new Date());
555                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
556                                                         + batchId + "]");
557                                         try {
558                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
559                                         
560                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
561                                                 // transactionLogs(batch);
562                                                 for (message msg : batch) {
563                                                         LogDetails logDetails = msg.getLogDetails();
564                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
565                                                 }
566                                         } catch (Exception excp) {
567
568                                                 int status = HttpStatus.SC_NOT_FOUND;
569                                                 String errorMsg = null;
570                                                 if (excp instanceof CambriaApiException) {
571                                                         status = ((CambriaApiException) excp).getStatus();
572                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
573                                                         JSONObject errObject = new JSONObject(jsonTokener);
574                                                         errorMsg = (String) errObject.get("message");
575                                                 }
576                                                 ErrorResponse errRes = new ErrorResponse(status,
577                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
578                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
579                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
580                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
581                                                                 ctx.getRequest().getRemoteHost(), null, null);
582                                                 LOG.info(errRes.toString());
583                                                 throw new CambriaApiException(errRes);
584                                         }
585                                         pms.clear();
586                                         batch.clear();
587                                         metricsSet.publishTick(sizeNow);
588                                         publishBatchCount = sizeNow;
589                                         count += sizeNow;
590                                         
591                                         String endTime = sdf.format(new Date());
592                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
593                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
594                                                         + ",Batch End Time=" + endTime + "]");
595                                         batchId = DMaaPContext.getBatchID();
596                                 }
597                         }
598
599                         // send the pending batch
600                         final int sizeNow = batch.size();
601                         if (sizeNow > 0) {
602                                 String startTime = sdf.format(new Date());
603                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
604                                                 + batchId + "]");
605                                 try {
606                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
607                                         
608                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
609                                         
610                                         for (message msg : batch) {
611                                                 LogDetails logDetails = msg.getLogDetails();
612                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
613                                         }
614                                 } catch (Exception excp) {
615                                         int status = HttpStatus.SC_NOT_FOUND;
616                                         String errorMsg = null;
617                                         if (excp instanceof CambriaApiException) {
618                                                 status = ((CambriaApiException) excp).getStatus();
619                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
620                                                 JSONObject errObject = new JSONObject(jsonTokener);
621                                                 errorMsg = (String) errObject.get("message");
622                                         }
623
624                                         ErrorResponse errRes = new ErrorResponse(status,
625                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
626                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
627                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
628                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
629                                                         ctx.getRequest().getRemoteHost(), null, null);
630                                         LOG.info(errRes.toString());
631                                         throw new CambriaApiException(errRes);
632                                 }
633                                 pms.clear();
634                                 metricsSet.publishTick(sizeNow);
635                                 count += sizeNow;
636                         
637                                 String endTime = sdf.format(new Date());
638                                 publishBatchCount = sizeNow;
639                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
640                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
641                                                 + endTime + "]");
642                         }
643
644                         final long endMs = System.currentTimeMillis();
645                         final long totalMs = endMs - startMs;
646
647                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
648
649                         if (null != responseTransactionId) {
650                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
651                         }
652
653                         // build a response
654                         final JSONObject response = new JSONObject();
655                         response.put("count", count);
656                         response.put("transactionId", responseTransactionId);
657                         response.put("serverTimeMs", totalMs);
658                         respondOk(ctx, response);
659
660                 } catch (Exception excp) {
661                         int status = HttpStatus.SC_NOT_FOUND;
662                         String errorMsg = null;
663                         if (excp instanceof CambriaApiException) {
664                                 status = ((CambriaApiException) excp).getStatus();
665                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
666                                 JSONObject errObject = new JSONObject(jsonTokener);
667                                 errorMsg = (String) errObject.get("message");
668                         }
669
670                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
671                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
672                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
673                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
674                                         ctx.getRequest().getRemoteHost(), null, null);
675                         LOG.info(errRes.toString());
676                         throw new CambriaApiException(errRes);
677                 }
678         }
679
680         /**
681          * 
682          * @param msg
683          * @param topic
684          * @param request
685          * @param messageCreationTime
686          * @param messageSequence
687          * @param batchId
688          * @param transactionEnabled
689          */
690         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
691                         final String messageCreationTime, final int messageSequence, final Long batchId,
692                         final boolean transactionEnabled) {
693                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
694                                 transactionEnabled);
695                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
696                 msg.setTransactionEnabled(transactionEnabled);
697                 msg.setLogDetails(logDetails);
698         }
699
700         void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
701                 DMaaPResponseBuilder.respondOk(ctx, response);
702         }
703
704         /**
705          * 
706          * @author anowarul.islam
707          *
708          */
709         private static class LogWrap {
710                 private final String fId;
711
712                 /**
713                  * constructor initialization
714                  * 
715                  * @param topic
716                  * @param cgroup
717                  * @param cid
718                  */
719                 public LogWrap(String topic, String cgroup, String cid) {
720                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
721                 }
722
723                 /**
724                  * 
725                  * @param msg
726                  */
727                 public void info(String msg) {
728                         LOG.info(fId + msg);
729                 }
730
731                 /**
732                  * 
733                  * @param msg
734                  * @param t
735                  */
736                 public void warn(String msg, Exception t) {
737                         LOG.warn(fId + msg, t);
738                 }
739
740         }
741
742         public boolean isTransEnabled() {
743                 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
744                 boolean istransidreqd = false;
745                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
746                         istransidreqd = true;
747                 }
748
749                 return istransidreqd;
750
751         }
752
753         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
754                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
755                 LogDetails logDetails = new LogDetails();
756                 logDetails.setTopicId(topicName);
757                 logDetails.setMessageTimestamp(messageTimestamp);
758                 logDetails.setPublisherId(Utils.getUserApiKey(request));
759                 logDetails.setPublisherIp(request.getRemoteHost());
760                 logDetails.setMessageBatchId(batchId);
761                 logDetails.setMessageSequence(String.valueOf(messageSequence));
762                 logDetails.setTransactionEnabled(transactionEnabled);
763                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
764                 logDetails.setServerIp(request.getLocalAddr());
765                 return logDetails;
766         }
767
768
769 }