Fix failing build
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.StringUtils;
45 import org.apache.commons.lang.math.NumberUtils;
46 import org.apache.http.HttpStatus;
47 import org.apache.kafka.clients.producer.ProducerRecord;
48 import org.apache.kafka.common.errors.TopicExistsException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.onap.dmaap.dmf.mr.CambriaApiException;
52 import org.onap.dmaap.dmf.mr.backends.Consumer;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.backends.Publisher;
57 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
59 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
60 import org.onap.dmaap.dmf.mr.beans.LogDetails;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
65 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
66 import org.onap.dmaap.dmf.mr.metabroker.Topic;
67 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
68 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
72 import org.onap.dmaap.dmf.mr.service.EventsService;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
74 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
75 import org.onap.dmaap.dmf.mr.utils.Utils;
76 import org.springframework.beans.factory.annotation.Autowired;
77 import org.springframework.stereotype.Service;
78
79 /**
80  * This class provides the functinality to publish and subscribe message to
81  * kafka
82  * 
83  * @author Ramkumar Sembaiyam
84  *
85  */
86 @Service
87 public class EventsServiceImpl implements EventsService {
88         
89         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90         private static final String BATCH_LENGTH = "event.batch.length";
91         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92         private static final String TIMEOUT_PROPERTY = "timeout";
93         private static final String SUBSCRIBE_ACTION = "sub";
94         private static final String PUBLISH_ACTION = "pub";
95
96         @Autowired
97         private DMaaPErrorMessages errorMessages;
98
99         String getPropertyFromAJSCmap(String propertyKey) {
100                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
101         }
102
103         public DMaaPErrorMessages getErrorMessages() {
104                 return errorMessages;
105         }
106
107         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
108                 this.errorMessages = errorMessages;
109         }
110
111         /**
112          * @param ctx
113          * @param topic
114          * @param consumerGroup
115          * @param clientId
116          * @throws ConfigDbException,
117          *             TopicExistsException, AccessDeniedException,
118          *             UnavailableException, CambriaApiException, IOException
119          * 
120          * 
121          */
122         @Override
123         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
124                         throws ConfigDbException, AccessDeniedException, UnavailableException,
125                         CambriaApiException, IOException {
126
127                 final long startTime = System.currentTimeMillis();
128                 final HttpServletRequest req = ctx.getRequest();
129                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
130                 final String remoteHost = req.getRemoteHost();
131                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
132                         .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
133
134                 validateIpBlacklist(errRespProvider, ctx);
135
136                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
137                 if (metaTopic == null) {
138                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
139                 }
140
141                 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
142
143                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
144                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
145                                 + " " + clientId);
146
147                 verifyHostId();
148                 final boolean pretty = isPrettyPrintEnabled();
149                 final boolean withMeta = isMetaOffsetEnabled();
150                 int timeoutMs = getMessageTimeout(req);
151                 int limit = getMessageLimit(req);
152                 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
153                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
154
155                 Consumer consumer = null;
156                 try {
157                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
158                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
159                         rl.onCall(topic, consumerGroup, clientId, remoteHost);
160                         consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
161                                         remoteHost);
162                         CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
163                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
164                         coes.setDmaapContext(ctx);
165                         coes.setTopic(metaTopic);
166                         coes.setTransEnabled(isTransEnabled() || isAAFTopic);
167                         coes.setTopicStyle(isAAFTopic);
168                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
169                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
170                                         + consumerGroup + " " + clientId);
171
172                         respondOkWithStream(ctx, coes);
173                         // No IOException thrown during respondOkWithStream, so commit the
174                         // new offsets to all the brokers
175                         consumer.commitOffsets();
176                         final int sent = coes.getSentCount();
177                         metricsSet.consumeTick(sent);
178                         rl.onSend(topic, consumerGroup, clientId, sent);
179                         final long elapsedMs = System.currentTimeMillis() - startTime;
180                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
181                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
182                                         + remoteHost);
183
184                 } catch (UnavailableException excp) {
185                         logger.warn(excp.getMessage(), excp);
186                         ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
187                         LOG.info(errRes.toString());
188                         throw new CambriaApiException(errRes);
189
190                 } catch (ConcurrentModificationException excp1) {
191                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
192                         ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
193                         logger.info(errRes.toString());
194                         throw new CambriaApiException(errRes);
195                         
196                 } catch (Exception excp) {
197                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
198                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
199                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
200                         ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
201                         logger.info(errRes.toString());
202                         throw new CambriaApiException(errRes);
203                 } finally {
204                         if (consumer != null && !isCacheEnabled()) {
205                                 try {
206                                         consumer.close();
207                                 } catch (Exception e) {
208                                         logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
209                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
210                                                         + " " + e);
211                                 }
212                         }
213                 }
214         }
215
216         private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
217                 final String remoteAddr = Utils.getRemoteAddress(ctx);
218                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
219                         ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
220                         LOG.info(errRes.toString());
221                         throw new CambriaApiException(errRes);
222                 }
223         }
224
225         private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
226                 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
227
228                 boolean isAAFTopic = false;
229                 String metricTopicName = getMetricTopicName();
230                 if(!metricTopicName.equalsIgnoreCase(topicName)) {
231                         if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
232                                 isAAFTopic = true;
233                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
234                                 String permission = aaf.aafPermissionString(topicName, action);
235                                 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
236                                         ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
237                                         LOG.info(errRes.toString());
238                                         throw new DMaaPAccessDeniedException(errRes);
239
240                                 }
241                         } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
242                                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
243                                 if(SUBSCRIBE_ACTION.equals(action)) {
244                                         metaTopic.checkUserRead(user);
245                                 } else if(PUBLISH_ACTION.equals(action)) {
246                                         metaTopic.checkUserWrite(user);
247                                 }
248                         }
249                 }
250                 return isAAFTopic;
251         }
252
253         boolean isCadiEnabled() {
254                 return Utils.isCadiEnabled();
255         }
256
257         void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
258                 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
259                 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
260         }
261
262         private int getMessageLimit(HttpServletRequest request) {
263                 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
264         }
265
266         private int getMessageTimeout(HttpServletRequest request) {
267                 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
268                 int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
269                         CambriaConstants.kNoTimeout;
270
271                 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
272                 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
273         }
274
275         private boolean isPrettyPrintEnabled() {
276                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
277         }
278
279         private boolean isMetaOffsetEnabled() {
280                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
281         }
282
283         private boolean isTopicNameEnforcedAaf(String topicName) {
284                 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
285                 return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd);
286         }
287
288         private boolean isCacheEnabled() {
289                 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
290                 return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
291         }
292
293         private void verifyHostId() {
294                 String lhostId = getPropertyFromAJSCmap("clusterhostid");
295                 if (StringUtils.isEmpty(lhostId)) {
296                         try {
297                                 InetAddress.getLocalHost().getCanonicalHostName();
298                         } catch (UnknownHostException e) {
299                                 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
300                         }
301
302                 }
303         }
304
305         private String getMetricTopicName() {
306                 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
307                 return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
308         }
309
310         /**
311          * @throws missingReqdSetting
312          * 
313          */
314         @Override
315         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
316                         final String requestTime) throws ConfigDbException, AccessDeniedException,
317                         CambriaApiException, IOException, missingReqdSetting {
318
319                 final long startMs = System.currentTimeMillis();
320                 String remoteHost = ctx.getRequest().getRemoteHost();
321                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
322                         .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
323                         .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
324
325                 validateIpBlacklist(errRespProvider, ctx);
326
327                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
328                 if (metaTopic == null) {
329                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
330                 }
331
332                 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
333
334                 final HttpServletRequest req = ctx.getRequest();
335                 boolean chunked = isRequestedChunk(req);
336                 String mediaType = getMediaType(req);
337                 boolean transactionRequired = isTransactionIdRequired();
338
339                 if (isAAFTopic || transactionRequired) {
340                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
341                 } else {
342                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
343                 }
344
345                 final long endMs = System.currentTimeMillis();
346                 final long totalMs = endMs - startMs;
347                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
348         }
349
350         private boolean isRequestedChunk(HttpServletRequest request) {
351                 return null != request.getHeader(TRANSFER_ENCODING) &&
352                         request.getHeader(TRANSFER_ENCODING).contains("chunked");
353         }
354
355         private String getMediaType(HttpServletRequest request) {
356                 String mediaType = request.getContentType();
357                 if (mediaType == null || mediaType.length() == 0) {
358                         return MimeTypes.kAppGenericBinary;
359                 }
360                 return mediaType.replace("; charset=UTF-8", "").trim();
361         }
362
363         private boolean isTransactionIdRequired() {
364                 String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd");
365                 return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true");
366         }
367
368         /**
369          * 
370          * @param ctx
371          * @param topic
372          * @param msg
373          * @param defaultPartition
374          * @param chunked
375          * @param mediaType
376          * @throws ConfigDbException
377          * @throws AccessDeniedException
378          * @throws TopicExistsException
379          * @throws CambriaApiException
380          * @throws IOException
381          */
382         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
383                         String mediaType)
384                         throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
385                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
386                 // setup the event set
387                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
388
389                 // start processing, building a batch to push to the backend
390                 final long startMs = System.currentTimeMillis();
391                 long count = 0;
392                 long maxEventBatch = 1024L* 16;
393                 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
394                 if (null != batchlen && !batchlen.isEmpty())
395                         maxEventBatch = Long.parseLong(batchlen);
396                 // long maxEventBatch =
397                 
398                 final LinkedList<Publisher.message> batch = new LinkedList<>();
399                 // final ArrayList<KeyedMessage<String, String>> kms = new
400         
401                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
402                 try {
403                         // for each message...
404                         Publisher.message m = null;
405                         while ((m = events.next()) != null) {
406                                 // add the message to the batch
407                                 batch.add(m);
408                                 // final KeyedMessage<String, String> data = new
409                                 // KeyedMessage<String, String>(topic, m.getKey(),
410                         
411                                 // kms.add(data);
412                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
413                                                 m.getMessage());
414
415                                 pms.add(data);
416                                 // check if the batch is full
417                                 final int sizeNow = batch.size();
418                                 if (sizeNow > maxEventBatch) {
419                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
420                                 
421                                         // kms.clear();
422                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
423                                         pms.clear();
424                                         batch.clear();
425                                         metricsSet.publishTick(sizeNow);
426                                         count += sizeNow;
427                                 }
428                         }
429
430                         // send the pending batch
431                         final int sizeNow = batch.size();
432                         if (sizeNow > 0) {
433                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
434                         
435                                 // kms.clear();
436                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
437                                 pms.clear();
438                                 batch.clear();
439                                 metricsSet.publishTick(sizeNow);
440                                 count += sizeNow;
441                         }
442
443                         final long endMs = System.currentTimeMillis();
444                         final long totalMs = endMs - startMs;
445
446                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
447                                         + ctx.getRequest().getRemoteHost());
448
449                         // build a responseP
450                         final JSONObject response = new JSONObject();
451                         response.put("count", count);
452                         response.put("serverTimeMs", totalMs);
453                         respondOk(ctx, response);
454
455                 } catch (Exception excp) {
456                         int status = HttpStatus.SC_NOT_FOUND;
457                         String errorMsg = null;
458                         if (excp instanceof CambriaApiException) {
459                                 status = ((CambriaApiException) excp).getStatus();
460                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
461                                 JSONObject errObject = new JSONObject(jsonTokener);
462                                 errorMsg = (String) errObject.get("message");
463
464                         }
465                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
466                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
467                                                         + "." + errorMsg,
468                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
469                                         null);
470                         LOG.info(errRes.toString());
471                         throw new CambriaApiException(errRes);
472
473                 }
474         }
475
476         /**
477          * 
478          * @param ctx
479          * @param inputStream
480          * @param topic
481          * @param partitionKey
482          * @param requestTime
483          * @param chunked
484          * @param mediaType
485          * @throws ConfigDbException
486          * @throws AccessDeniedException
487          * @throws TopicExistsException
488          * @throws IOException
489          * @throws CambriaApiException
490          */
491         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
492                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
493                         throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
494
495                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
496
497                 // setup the event set
498                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
499
500                 // start processing, building a batch to push to the backend
501                 final long startMs = System.currentTimeMillis();
502                 long count = 0;
503                 long maxEventBatch = 1024L * 16;
504                 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
505                 if (null != evenlen && !evenlen.isEmpty())
506                         maxEventBatch = Long.parseLong(evenlen);
507                 // final long maxEventBatch =
508                 
509                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
510                 // final ArrayList<KeyedMessage<String, String>> kms = new
511                 
512                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
513                 Publisher.message m = null;
514                 int messageSequence = 1;
515                 Long batchId = 1L;
516                 final boolean transactionEnabled = true;
517                 int publishBatchCount = 0;
518                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
519
520                 // LOG.warn("Batch Start Id: " +
521         
522                 try {
523                         // for each message...
524                         batchId = DMaaPContext.getBatchID();
525
526                         String responseTransactionId = null;
527
528                         while ((m = events.next()) != null) {
529
530                                 // LOG.warn("Batch Start Id: " +
531                                 
532
533                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
534                                                 transactionEnabled);
535                                 messageSequence++;
536
537                         
538                                 batch.add(m);
539
540                                 responseTransactionId = m.getLogDetails().getTransactionId();
541
542                                 //JSONObject jsonObject = new JSONObject();
543                                 //jsonObject.put("msgWrapMR", m.getMessage());
544                                 //jsonObject.put("transactionId", responseTransactionId);
545                                 // final KeyedMessage<String, String> data = new
546                                 // KeyedMessage<String, String>(topic, m.getKey(),
547                         
548                                 
549                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
550                                                 m.getMessage());
551
552                                 pms.add(data);
553                                 // check if the batch is full
554                                 final int sizeNow = batch.size();
555                                 if (sizeNow >= maxEventBatch) {
556                                         String startTime = sdf.format(new Date());
557                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
558                                                         + batchId + "]");
559                                         try {
560                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
561                                         
562                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
563                                                 // transactionLogs(batch);
564                                                 for (message msg : batch) {
565                                                         LogDetails logDetails = msg.getLogDetails();
566                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
567                                                 }
568                                         } catch (Exception excp) {
569
570                                                 int status = HttpStatus.SC_NOT_FOUND;
571                                                 String errorMsg = null;
572                                                 if (excp instanceof CambriaApiException) {
573                                                         status = ((CambriaApiException) excp).getStatus();
574                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
575                                                         JSONObject errObject = new JSONObject(jsonTokener);
576                                                         errorMsg = (String) errObject.get("message");
577                                                 }
578                                                 ErrorResponse errRes = new ErrorResponse(status,
579                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
580                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
581                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
582                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
583                                                                 ctx.getRequest().getRemoteHost(), null, null);
584                                                 LOG.info(errRes.toString());
585                                                 throw new CambriaApiException(errRes);
586                                         }
587                                         pms.clear();
588                                         batch.clear();
589                                         metricsSet.publishTick(sizeNow);
590                                         publishBatchCount = sizeNow;
591                                         count += sizeNow;
592                                         
593                                         String endTime = sdf.format(new Date());
594                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
595                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
596                                                         + ",Batch End Time=" + endTime + "]");
597                                         batchId = DMaaPContext.getBatchID();
598                                 }
599                         }
600
601                         // send the pending batch
602                         final int sizeNow = batch.size();
603                         if (sizeNow > 0) {
604                                 String startTime = sdf.format(new Date());
605                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
606                                                 + batchId + "]");
607                                 try {
608                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
609                                         
610                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
611                                         
612                                         for (message msg : batch) {
613                                                 LogDetails logDetails = msg.getLogDetails();
614                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
615                                         }
616                                 } catch (Exception excp) {
617                                         int status = HttpStatus.SC_NOT_FOUND;
618                                         String errorMsg = null;
619                                         if (excp instanceof CambriaApiException) {
620                                                 status = ((CambriaApiException) excp).getStatus();
621                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
622                                                 JSONObject errObject = new JSONObject(jsonTokener);
623                                                 errorMsg = (String) errObject.get("message");
624                                         }
625
626                                         ErrorResponse errRes = new ErrorResponse(status,
627                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
628                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
629                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
630                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
631                                                         ctx.getRequest().getRemoteHost(), null, null);
632                                         LOG.info(errRes.toString());
633                                         throw new CambriaApiException(errRes);
634                                 }
635                                 pms.clear();
636                                 metricsSet.publishTick(sizeNow);
637                                 count += sizeNow;
638                         
639                                 String endTime = sdf.format(new Date());
640                                 publishBatchCount = sizeNow;
641                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
642                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
643                                                 + endTime + "]");
644                         }
645
646                         final long endMs = System.currentTimeMillis();
647                         final long totalMs = endMs - startMs;
648
649                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
650
651                         if (null != responseTransactionId) {
652                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
653                         }
654
655                         // build a response
656                         final JSONObject response = new JSONObject();
657                         response.put("count", count);
658                         response.put("transactionId", responseTransactionId);
659                         response.put("serverTimeMs", totalMs);
660                         respondOk(ctx, response);
661
662                 } catch (Exception excp) {
663                         int status = HttpStatus.SC_NOT_FOUND;
664                         String errorMsg = null;
665                         if (excp instanceof CambriaApiException) {
666                                 status = ((CambriaApiException) excp).getStatus();
667                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
668                                 JSONObject errObject = new JSONObject(jsonTokener);
669                                 errorMsg = (String) errObject.get("message");
670                         }
671
672                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
673                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
674                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
675                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
676                                         ctx.getRequest().getRemoteHost(), null, null);
677                         LOG.info(errRes.toString());
678                         throw new CambriaApiException(errRes);
679                 }
680         }
681
682         /**
683          * 
684          * @param msg
685          * @param topic
686          * @param request
687          * @param messageCreationTime
688          * @param messageSequence
689          * @param batchId
690          * @param transactionEnabled
691          */
692         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
693                         final String messageCreationTime, final int messageSequence, final Long batchId,
694                         final boolean transactionEnabled) {
695                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
696                                 transactionEnabled);
697                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
698                 msg.setTransactionEnabled(transactionEnabled);
699                 msg.setLogDetails(logDetails);
700         }
701
702         void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
703                 DMaaPResponseBuilder.respondOk(ctx, response);
704         }
705
706         /**
707          * 
708          * @author anowarul.islam
709          *
710          */
711         private static class LogWrap {
712                 private final String fId;
713
714                 /**
715                  * constructor initialization
716                  * 
717                  * @param topic
718                  * @param cgroup
719                  * @param cid
720                  */
721                 public LogWrap(String topic, String cgroup, String cid) {
722                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
723                 }
724
725                 /**
726                  * 
727                  * @param msg
728                  */
729                 public void info(String msg) {
730                         LOG.info(fId + msg);
731                 }
732
733                 /**
734                  * 
735                  * @param msg
736                  * @param t
737                  */
738                 public void warn(String msg, Exception t) {
739                         LOG.warn(fId + msg, t);
740                 }
741
742         }
743
744         public boolean isTransEnabled() {
745                 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
746                 boolean istransidreqd = false;
747                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
748                         istransidreqd = true;
749                 }
750
751                 return istransidreqd;
752
753         }
754
755         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
756                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
757                 LogDetails logDetails = new LogDetails();
758                 logDetails.setTopicId(topicName);
759                 logDetails.setMessageTimestamp(messageTimestamp);
760                 logDetails.setPublisherId(Utils.getUserApiKey(request));
761                 logDetails.setPublisherIp(request.getRemoteHost());
762                 logDetails.setMessageBatchId(batchId);
763                 logDetails.setMessageSequence(String.valueOf(messageSequence));
764                 logDetails.setTransactionEnabled(transactionEnabled);
765                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
766                 logDetails.setServerIp(request.getLocalAddr());
767                 return logDetails;
768         }
769
770
771 }