f25867dd5afb2426876038026aedfab7209b0001
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.math.NumberUtils;
45 import org.apache.http.HttpStatus;
46 import org.apache.kafka.clients.producer.ProducerRecord;
47 import org.apache.kafka.common.errors.TopicExistsException;
48 import org.json.JSONObject;
49 import org.json.JSONTokener;
50 import org.onap.dmaap.dmf.mr.CambriaApiException;
51 import org.onap.dmaap.dmf.mr.backends.Consumer;
52 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
54 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
55 import org.onap.dmaap.dmf.mr.backends.Publisher;
56 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
57 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
59 import org.onap.dmaap.dmf.mr.beans.LogDetails;
60 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
61 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
64 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
65 import org.onap.dmaap.dmf.mr.metabroker.Topic;
66 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
67 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
68 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.service.EventsService;
72 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
74 import org.onap.dmaap.dmf.mr.utils.Utils;
75 import org.springframework.beans.factory.annotation.Autowired;
76 import org.springframework.stereotype.Service;
77
78 /**
79  * This class provides the functinality to publish and subscribe message to
80  * kafka
81  * 
82  * @author Ramkumar Sembaiyam
83  *
84  */
85 @Service
86 public class EventsServiceImpl implements EventsService {
87         
88         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89         private static final String BATCH_LENGTH = "event.batch.length";
90         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
91         private static final String TIMEOUT_PROPERTY = "timeout";
92         private static final String SUBSCRIBE_ACTION = "sub";
93         private static final String PUBLISH_ACTION = "pub";
94
95         @Autowired
96         private DMaaPErrorMessages errorMessages;
97
98         String getPropertyFromAJSCmap(String propertyKey) {
99                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
100         }
101
102         public DMaaPErrorMessages getErrorMessages() {
103                 return errorMessages;
104         }
105
106         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
107                 this.errorMessages = errorMessages;
108         }
109
110         /**
111          * @param ctx
112          * @param topic
113          * @param consumerGroup
114          * @param clientId
115          * @throws ConfigDbException,
116          *             TopicExistsException, AccessDeniedException,
117          *             UnavailableException, CambriaApiException, IOException
118          * 
119          * 
120          */
121         @Override
122         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
123                         throws ConfigDbException, AccessDeniedException, UnavailableException,
124                         CambriaApiException, IOException {
125
126                 final long startTime = System.currentTimeMillis();
127                 final HttpServletRequest req = ctx.getRequest();
128                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
129                 final String remoteHost = req.getRemoteHost();
130                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
131                         .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
132
133                 validateIpBlacklist(errRespProvider, ctx);
134
135                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
136                 if (metaTopic == null) {
137                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
138                 }
139
140                 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
141
142                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
143                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
144                                 + " " + clientId);
145
146                 verifyHostId();
147                 final boolean pretty = isPrettyPrintEnabled();
148                 final boolean withMeta = isMetaOffsetEnabled();
149                 int timeoutMs = getMessageTimeout(req);
150                 int limit = getMessageLimit(req);
151                 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
152                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
153
154                 Consumer consumer = null;
155                 try {
156                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
157                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
158                         rl.onCall(topic, consumerGroup, clientId, remoteHost);
159                         consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
160                                         remoteHost);
161                         CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
162                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
163                         coes.setDmaapContext(ctx);
164                         coes.setTopic(metaTopic);
165                         coes.setTransEnabled(isTransEnabled() || isAAFTopic);
166                         coes.setTopicStyle(isAAFTopic);
167                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
168                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
169                                         + consumerGroup + " " + clientId);
170
171                         respondOkWithStream(ctx, coes);
172                         // No IOException thrown during respondOkWithStream, so commit the
173                         // new offsets to all the brokers
174                         consumer.commitOffsets();
175                         final int sent = coes.getSentCount();
176                         metricsSet.consumeTick(sent);
177                         rl.onSend(topic, consumerGroup, clientId, sent);
178                         final long elapsedMs = System.currentTimeMillis() - startTime;
179                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
180                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
181                                         + remoteHost);
182
183                 } catch (UnavailableException excp) {
184                         logger.warn(excp.getMessage(), excp);
185                         ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
186                         LOG.info(errRes.toString());
187                         throw new CambriaApiException(errRes);
188
189                 } catch (ConcurrentModificationException excp1) {
190                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
191                         ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
192                         logger.info(errRes.toString());
193                         throw new CambriaApiException(errRes);
194                         
195                 } catch (Exception excp) {
196                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
197                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
198                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
199                         ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
200                         logger.info(errRes.toString());
201                         throw new CambriaApiException(errRes);
202                 } finally {
203                         if (consumer != null && !isCacheEnabled()) {
204                                 try {
205                                         consumer.close();
206                                 } catch (Exception e) {
207                                         logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
208                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
209                                                         + " " + e);
210                                 }
211                         }
212                 }
213         }
214
215         private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
216                 final String remoteAddr = Utils.getRemoteAddress(ctx);
217                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
218                         ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
219                         LOG.info(errRes.toString());
220                         throw new CambriaApiException(errRes);
221                 }
222         }
223
224         private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
225                 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
226
227                 boolean isAAFTopic = false;
228                 String metricTopicName = getMetricTopicName();
229                 if(!metricTopicName.equalsIgnoreCase(topicName)) {
230                         if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
231                                 isAAFTopic = true;
232                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
233                                 String permission = aaf.aafPermissionString(topicName, action);
234                                 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
235                                         ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
236                                         LOG.info(errRes.toString());
237                                         throw new DMaaPAccessDeniedException(errRes);
238
239                                 }
240                         } else if( null!=metaTopic&& null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
241                                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
242                                 if(SUBSCRIBE_ACTION.equals(action)) {
243                                         metaTopic.checkUserRead(user);
244                                 } else if(PUBLISH_ACTION.equals(action)) {
245                                         metaTopic.checkUserWrite(user);
246                                 }
247                         }
248                 }
249                 return isAAFTopic;
250         }
251
252         boolean isCadiEnabled() {
253                 return Utils.isCadiEnabled();
254         }
255
256         void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
257                 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
258                 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
259         }
260
261         private int getMessageLimit(HttpServletRequest request) {
262                 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
263         }
264
265         private int getMessageTimeout(HttpServletRequest request) {
266                 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
267                 int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
268                         CambriaConstants.kNoTimeout;
269
270                 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
271                 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
272         }
273
274         private boolean isPrettyPrintEnabled() {
275                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
276         }
277
278         private boolean isMetaOffsetEnabled() {
279                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
280         }
281
282         private boolean isTopicNameEnforcedAaf(String topicName) {
283                 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
284                 return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
285         }
286
287         private boolean isCacheEnabled() {
288                 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
289                 return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
290         }
291
292         private void verifyHostId() {
293                 String lhostId = getPropertyFromAJSCmap("clusterhostid");
294                 if (lhostId.isEmpty()) {
295                         try {
296                                 InetAddress.getLocalHost().getCanonicalHostName();
297                         } catch (UnknownHostException e) {
298                                 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
299                         }
300
301                 }
302         }
303
304         private String getMetricTopicName() {
305                 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
306                 return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
307         }
308
309         /**
310          * @throws missingReqdSetting
311          * 
312          */
313         @Override
314         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
315                         final String requestTime) throws ConfigDbException, AccessDeniedException,
316                         CambriaApiException, IOException, missingReqdSetting {
317
318                 final long startMs = System.currentTimeMillis();
319                 String remoteHost = ctx.getRequest().getRemoteHost();
320                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
321                         .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
322                         .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
323
324                 validateIpBlacklist(errRespProvider, ctx);
325
326                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
327                 
328
329                 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
330
331                 final HttpServletRequest req = ctx.getRequest();
332                 boolean chunked = isRequestedChunk(req);
333                 String mediaType = getMediaType(req);
334                 boolean transactionRequired = isTransactionIdRequired();
335
336                 if (isAAFTopic || transactionRequired) {
337                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
338                 } else {
339                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
340                 }
341
342                 final long endMs = System.currentTimeMillis();
343                 final long totalMs = endMs - startMs;
344                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
345         }
346
347         private boolean isRequestedChunk(HttpServletRequest request) {
348                 return null != request.getHeader(TRANSFER_ENCODING) &&
349                         request.getHeader(TRANSFER_ENCODING).contains("chunked");
350         }
351
352         private String getMediaType(HttpServletRequest request) {
353                 String mediaType = request.getContentType();
354                 if (mediaType == null || mediaType.length() == 0) {
355                         return MimeTypes.kAppGenericBinary;
356                 }
357                 return mediaType.replace("; charset=UTF-8", "").trim();
358         }
359
360         private boolean isTransactionIdRequired() {
361                 return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
362         }
363
364         /**
365          * 
366          * @param ctx
367          * @param topic
368          * @param msg
369          * @param defaultPartition
370          * @param chunked
371          * @param mediaType
372          * @throws ConfigDbException
373          * @throws AccessDeniedException
374          * @throws TopicExistsException
375          * @throws CambriaApiException
376          * @throws IOException
377          */
378         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
379                         String mediaType)
380                         throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
381                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
382                 // setup the event set
383                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
384
385                 // start processing, building a batch to push to the backend
386                 final long startMs = System.currentTimeMillis();
387                 long count = 0;
388                 long maxEventBatch = 1024L* 16;
389                 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
390                 if (null != batchlen && !batchlen.isEmpty())
391                         maxEventBatch = Long.parseLong(batchlen);
392                 // long maxEventBatch =
393                 
394                 final LinkedList<Publisher.message> batch = new LinkedList<>();
395                 // final ArrayList<KeyedMessage<String, String>> kms = new
396         
397                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
398                 try {
399                         // for each message...
400                         Publisher.message m = null;
401                         while ((m = events.next()) != null) {
402                                 // add the message to the batch
403                                 batch.add(m);
404                                 // final KeyedMessage<String, String> data = new
405                                 // KeyedMessage<String, String>(topic, m.getKey(),
406                         
407                                 // kms.add(data);
408                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
409                                                 m.getMessage());
410
411                                 pms.add(data);
412                                 // check if the batch is full
413                                 final int sizeNow = batch.size();
414                                 if (sizeNow > maxEventBatch) {
415                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
416                                 
417                                         // kms.clear();
418                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
419                                         pms.clear();
420                                         batch.clear();
421                                         metricsSet.publishTick(sizeNow);
422                                         count += sizeNow;
423                                 }
424                         }
425
426                         // send the pending batch
427                         final int sizeNow = batch.size();
428                         if (sizeNow > 0) {
429                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
430                         
431                                 // kms.clear();
432                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
433                                 pms.clear();
434                                 batch.clear();
435                                 metricsSet.publishTick(sizeNow);
436                                 count += sizeNow;
437                         }
438
439                         final long endMs = System.currentTimeMillis();
440                         final long totalMs = endMs - startMs;
441
442                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
443                                         + ctx.getRequest().getRemoteHost());
444
445                         // build a responseP
446                         final JSONObject response = new JSONObject();
447                         response.put("count", count);
448                         response.put("serverTimeMs", totalMs);
449                         respondOk(ctx, response);
450
451                 } catch (Exception excp) {
452                         int status = HttpStatus.SC_NOT_FOUND;
453                         String errorMsg = null;
454                         if (excp instanceof CambriaApiException) {
455                                 status = ((CambriaApiException) excp).getStatus();
456                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
457                                 JSONObject errObject = new JSONObject(jsonTokener);
458                                 errorMsg = (String) errObject.get("message");
459
460                         }
461                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
462                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
463                                                         + "." + errorMsg,
464                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
465                                         null);
466                         LOG.info(errRes.toString());
467                         throw new CambriaApiException(errRes);
468
469                 }
470         }
471
472         /**
473          * 
474          * @param ctx
475          * @param inputStream
476          * @param topic
477          * @param partitionKey
478          * @param requestTime
479          * @param chunked
480          * @param mediaType
481          * @throws ConfigDbException
482          * @throws AccessDeniedException
483          * @throws TopicExistsException
484          * @throws IOException
485          * @throws CambriaApiException
486          */
487         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
488                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
489                         throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
490
491                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
492
493                 // setup the event set
494                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
495
496                 // start processing, building a batch to push to the backend
497                 final long startMs = System.currentTimeMillis();
498                 long count = 0;
499                 long maxEventBatch = 1024L * 16;
500                 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
501                 if (null != evenlen && !evenlen.isEmpty())
502                         maxEventBatch = Long.parseLong(evenlen);
503                 // final long maxEventBatch =
504                 
505                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
506                 // final ArrayList<KeyedMessage<String, String>> kms = new
507                 
508                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
509                 Publisher.message m = null;
510                 int messageSequence = 1;
511                 Long batchId = 1L;
512                 final boolean transactionEnabled = true;
513                 int publishBatchCount = 0;
514                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
515
516                 // LOG.warn("Batch Start Id: " +
517         
518                 try {
519                         // for each message...
520                         batchId = DMaaPContext.getBatchID();
521
522                         String responseTransactionId = null;
523
524                         while ((m = events.next()) != null) {
525
526                                 // LOG.warn("Batch Start Id: " +
527                                 
528
529                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
530                                                 transactionEnabled);
531                                 messageSequence++;
532
533                         
534                                 batch.add(m);
535
536                                 responseTransactionId = m.getLogDetails().getTransactionId();
537
538                                 //JSONObject jsonObject = new JSONObject();
539                                 //jsonObject.put("msgWrapMR", m.getMessage());
540                                 //jsonObject.put("transactionId", responseTransactionId);
541                                 // final KeyedMessage<String, String> data = new
542                                 // KeyedMessage<String, String>(topic, m.getKey(),
543                         
544                                 
545                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
546                                                 m.getMessage());
547
548                                 pms.add(data);
549                                 // check if the batch is full
550                                 final int sizeNow = batch.size();
551                                 if (sizeNow >= maxEventBatch) {
552                                         String startTime = sdf.format(new Date());
553                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
554                                                         + batchId + "]");
555                                         try {
556                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
557                                         
558                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
559                                                 // transactionLogs(batch);
560                                                 for (message msg : batch) {
561                                                         LogDetails logDetails = msg.getLogDetails();
562                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
563                                                 }
564                                         } catch (Exception excp) {
565
566                                                 int status = HttpStatus.SC_NOT_FOUND;
567                                                 String errorMsg = null;
568                                                 if (excp instanceof CambriaApiException) {
569                                                         status = ((CambriaApiException) excp).getStatus();
570                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
571                                                         JSONObject errObject = new JSONObject(jsonTokener);
572                                                         errorMsg = (String) errObject.get("message");
573                                                 }
574                                                 ErrorResponse errRes = new ErrorResponse(status,
575                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
576                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
577                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
578                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
579                                                                 ctx.getRequest().getRemoteHost(), null, null);
580                                                 LOG.info(errRes.toString());
581                                                 throw new CambriaApiException(errRes);
582                                         }
583                                         pms.clear();
584                                         batch.clear();
585                                         metricsSet.publishTick(sizeNow);
586                                         publishBatchCount = sizeNow;
587                                         count += sizeNow;
588                                         
589                                         String endTime = sdf.format(new Date());
590                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
591                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
592                                                         + ",Batch End Time=" + endTime + "]");
593                                         batchId = DMaaPContext.getBatchID();
594                                 }
595                         }
596
597                         // send the pending batch
598                         final int sizeNow = batch.size();
599                         if (sizeNow > 0) {
600                                 String startTime = sdf.format(new Date());
601                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
602                                                 + batchId + "]");
603                                 try {
604                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
605                                         
606                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
607                                         
608                                         for (message msg : batch) {
609                                                 LogDetails logDetails = msg.getLogDetails();
610                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
611                                         }
612                                 } catch (Exception excp) {
613                                         int status = HttpStatus.SC_NOT_FOUND;
614                                         String errorMsg = null;
615                                         if (excp instanceof CambriaApiException) {
616                                                 status = ((CambriaApiException) excp).getStatus();
617                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
618                                                 JSONObject errObject = new JSONObject(jsonTokener);
619                                                 errorMsg = (String) errObject.get("message");
620                                         }
621
622                                         ErrorResponse errRes = new ErrorResponse(status,
623                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
624                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
625                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
626                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
627                                                         ctx.getRequest().getRemoteHost(), null, null);
628                                         LOG.info(errRes.toString());
629                                         throw new CambriaApiException(errRes);
630                                 }
631                                 pms.clear();
632                                 metricsSet.publishTick(sizeNow);
633                                 count += sizeNow;
634                         
635                                 String endTime = sdf.format(new Date());
636                                 publishBatchCount = sizeNow;
637                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
638                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
639                                                 + endTime + "]");
640                         }
641
642                         final long endMs = System.currentTimeMillis();
643                         final long totalMs = endMs - startMs;
644
645                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
646
647                         if (null != responseTransactionId) {
648                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
649                         }
650
651                         // build a response
652                         final JSONObject response = new JSONObject();
653                         response.put("count", count);
654                         response.put("transactionId", responseTransactionId);
655                         response.put("serverTimeMs", totalMs);
656                         respondOk(ctx, response);
657
658                 } catch (Exception excp) {
659                         int status = HttpStatus.SC_NOT_FOUND;
660                         String errorMsg = null;
661                         if (excp instanceof CambriaApiException) {
662                                 status = ((CambriaApiException) excp).getStatus();
663                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
664                                 JSONObject errObject = new JSONObject(jsonTokener);
665                                 errorMsg = (String) errObject.get("message");
666                         }
667
668                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
669                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
670                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
671                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
672                                         ctx.getRequest().getRemoteHost(), null, null);
673                         LOG.info(errRes.toString());
674                         throw new CambriaApiException(errRes);
675                 }
676         }
677
678         /**
679          * 
680          * @param msg
681          * @param topic
682          * @param request
683          * @param messageCreationTime
684          * @param messageSequence
685          * @param batchId
686          * @param transactionEnabled
687          */
688         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
689                         final String messageCreationTime, final int messageSequence, final Long batchId,
690                         final boolean transactionEnabled) {
691                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
692                                 transactionEnabled);
693                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
694                 msg.setTransactionEnabled(transactionEnabled);
695                 msg.setLogDetails(logDetails);
696         }
697
698         void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
699                 DMaaPResponseBuilder.respondOk(ctx, response);
700         }
701
702         /**
703          * 
704          * @author anowarul.islam
705          *
706          */
707         private static class LogWrap {
708                 private final String fId;
709
710                 /**
711                  * constructor initialization
712                  * 
713                  * @param topic
714                  * @param cgroup
715                  * @param cid
716                  */
717                 public LogWrap(String topic, String cgroup, String cid) {
718                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
719                 }
720
721                 /**
722                  * 
723                  * @param msg
724                  */
725                 public void info(String msg) {
726                         LOG.info(fId + msg);
727                 }
728
729                 /**
730                  * 
731                  * @param msg
732                  * @param t
733                  */
734                 public void warn(String msg, Exception t) {
735                         LOG.warn(fId + msg, t);
736                 }
737
738         }
739
740         public boolean isTransEnabled() {
741                 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
742                 boolean istransidreqd = false;
743                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
744                         istransidreqd = true;
745                 }
746
747                 return istransidreqd;
748
749         }
750
751         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
752                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
753                 LogDetails logDetails = new LogDetails();
754                 logDetails.setTopicId(topicName);
755                 logDetails.setMessageTimestamp(messageTimestamp);
756                 logDetails.setPublisherId(Utils.getUserApiKey(request));
757                 logDetails.setPublisherIp(request.getRemoteHost());
758                 logDetails.setMessageBatchId(batchId);
759                 logDetails.setMessageSequence(String.valueOf(messageSequence));
760                 logDetails.setTransactionEnabled(transactionEnabled);
761                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
762                 logDetails.setServerIp(request.getLocalAddr());
763                 return logDetails;
764         }
765
766
767 }