fix the failing job
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.StringUtils;
45 import org.apache.commons.lang.math.NumberUtils;
46 import org.apache.http.HttpStatus;
47 import org.apache.kafka.clients.producer.ProducerRecord;
48 import org.apache.kafka.common.errors.TopicExistsException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.onap.dmaap.dmf.mr.CambriaApiException;
52 import org.onap.dmaap.dmf.mr.backends.Consumer;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.backends.Publisher;
57 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
59 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
60 import org.onap.dmaap.dmf.mr.beans.LogDetails;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
65 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
66 import org.onap.dmaap.dmf.mr.metabroker.Topic;
67 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
68 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
72 import org.onap.dmaap.dmf.mr.service.EventsService;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
74 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
75 import org.onap.dmaap.dmf.mr.utils.Utils;
76 import org.springframework.beans.factory.annotation.Autowired;
77 import org.springframework.stereotype.Service;
78
79 /**
80  * This class provides the functinality to publish and subscribe message to
81  * kafka
82  * 
83  * @author Ramkumar Sembaiyam
84  *
85  */
86 @Service
87 public class EventsServiceImpl implements EventsService {
88         
89         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90         private static final String BATCH_LENGTH = "event.batch.length";
91         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92         private static final String TIMEOUT_PROPERTY = "timeout";
93         private static final String SUBSCRIBE_ACTION = "sub";
94         private static final String PUBLISH_ACTION = "pub";
95
96         @Autowired
97         private DMaaPErrorMessages errorMessages;
98
99         String getPropertyFromAJSCmap(String propertyKey) {
100                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
101         }
102
103         public DMaaPErrorMessages getErrorMessages() {
104                 return errorMessages;
105         }
106
107         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
108                 this.errorMessages = errorMessages;
109         }
110
111         /**
112          * @param ctx
113          * @param topic
114          * @param consumerGroup
115          * @param clientId
116          * @throws ConfigDbException,
117          *             TopicExistsException, AccessDeniedException,
118          *             UnavailableException, CambriaApiException, IOException
119          * 
120          * 
121          */
122         @Override
123         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
124                         throws ConfigDbException, AccessDeniedException, UnavailableException,
125                         CambriaApiException, IOException {
126
127                 final long startTime = System.currentTimeMillis();
128                 final HttpServletRequest req = ctx.getRequest();
129                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
130                 final String remoteHost = req.getRemoteHost();
131                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
132                         .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
133
134                 validateIpBlacklist(errRespProvider, ctx);
135
136                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
137                 if (metaTopic == null) {
138                         throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
139                 }
140
141                 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
142
143                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
144                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
145                                 + " " + clientId);
146
147                 verifyHostId();
148                 final boolean pretty = isPrettyPrintEnabled();
149                 final boolean withMeta = isMetaOffsetEnabled();
150                 int timeoutMs = getMessageTimeout(req);
151                 int limit = getMessageLimit(req);
152                 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
153                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
154
155                 Consumer consumer = null;
156                 try {
157                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
158                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
159                         rl.onCall(topic, consumerGroup, clientId, remoteHost);
160                         consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
161                                         remoteHost);
162                         CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
163                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
164                         coes.setDmaapContext(ctx);
165                         coes.setTopic(metaTopic);
166                         coes.setTransEnabled(isTransEnabled() || isAAFTopic);
167                         coes.setTopicStyle(isAAFTopic);
168                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
169                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
170                                         + consumerGroup + " " + clientId);
171
172                         respondOkWithStream(ctx, coes);
173                         // No IOException thrown during respondOkWithStream, so commit the
174                         // new offsets to all the brokers
175                         consumer.commitOffsets();
176                         final int sent = coes.getSentCount();
177                         metricsSet.consumeTick(sent);
178                         rl.onSend(topic, consumerGroup, clientId, sent);
179                         final long elapsedMs = System.currentTimeMillis() - startTime;
180                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
181                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
182                                         + remoteHost);
183
184                 } catch (UnavailableException excp) {
185                         logger.warn(excp.getMessage(), excp);
186                         ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
187                         LOG.info(errRes.toString());
188                         throw new CambriaApiException(errRes);
189
190                 } catch (ConcurrentModificationException excp1) {
191                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
192                         ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
193                         logger.info(errRes.toString());
194                         throw new CambriaApiException(errRes);
195                         
196                 } catch (Exception excp) {
197                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
198                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
199                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
200                         ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
201                         logger.info(errRes.toString());
202                         throw new CambriaApiException(errRes);
203                 } finally {
204                         if (consumer != null && !isCacheEnabled()) {
205                                 try {
206                                         consumer.close();
207                                 } catch (Exception e) {
208                                         logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
209                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
210                                                         + " " + e);
211                                 }
212                         }
213                 }
214         }
215
216         private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
217                 final String remoteAddr = Utils.getRemoteAddress(ctx);
218                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
219                         ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
220                         LOG.info(errRes.toString());
221                         throw new CambriaApiException(errRes);
222                 }
223         }
224
225         private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
226                 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
227
228                 boolean isAAFTopic = false;
229                 String metricTopicName = getMetricTopicName();
230                 if(!metricTopicName.equalsIgnoreCase(topicName)) {
231                         if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
232                                 isAAFTopic = true;
233                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
234                                 String permission = aaf.aafPermissionString(topicName, action);
235                                 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
236                                         ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
237                                         LOG.info(errRes.toString());
238                                         throw new DMaaPAccessDeniedException(errRes);
239
240                                 }
241                         } else if(metaTopic!=null && null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
242                                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
243                                 if(SUBSCRIBE_ACTION.equals(action)) {
244                                         metaTopic.checkUserRead(user);
245                                 } else if(PUBLISH_ACTION.equals(action)) {
246                                         metaTopic.checkUserWrite(user);
247                                 }
248                         }
249                 }
250                 return isAAFTopic;
251         }
252
253         boolean isCadiEnabled() {
254                 return Utils.isCadiEnabled();
255         }
256
257         void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
258                 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
259                 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
260         }
261
262         private int getMessageLimit(HttpServletRequest request) {
263                 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
264         }
265
266         private int getMessageTimeout(HttpServletRequest request) {
267                 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
268                 int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
269                         CambriaConstants.kNoTimeout;
270
271                 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
272                 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
273         }
274
275         private boolean isPrettyPrintEnabled() {
276                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
277         }
278
279         private boolean isMetaOffsetEnabled() {
280                 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
281         }
282
283         private boolean isTopicNameEnforcedAaf(String topicName) {
284                 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
285                 return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd);
286         }
287
288         private boolean isCacheEnabled() {
289                 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
290                 return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
291         }
292
293         private void verifyHostId() {
294                 String lhostId = getPropertyFromAJSCmap("clusterhostid");
295                 if (StringUtils.isEmpty(lhostId)) {
296                         try {
297                                 InetAddress.getLocalHost().getCanonicalHostName();
298                         } catch (UnknownHostException e) {
299                                 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
300                         }
301
302                 }
303         }
304
305         private String getMetricTopicName() {
306                 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
307                 return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
308         }
309
310         /**
311          * @throws missingReqdSetting
312          * 
313          */
314         @Override
315         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
316                         final String requestTime) throws ConfigDbException, AccessDeniedException,
317                         CambriaApiException, IOException, missingReqdSetting {
318
319                 final long startMs = System.currentTimeMillis();
320                 String remoteHost = ctx.getRequest().getRemoteHost();
321                 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
322                         .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
323                         .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
324
325                 validateIpBlacklist(errRespProvider, ctx);
326
327                 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
328
329                 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
330
331                 final HttpServletRequest req = ctx.getRequest();
332                 boolean chunked = isRequestedChunk(req);
333                 String mediaType = getMediaType(req);
334                 boolean transactionRequired = isTransactionIdRequired();
335
336                 if (isAAFTopic || transactionRequired) {
337                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
338                 } else {
339                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
340                 }
341
342                 final long endMs = System.currentTimeMillis();
343                 final long totalMs = endMs - startMs;
344                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
345         }
346
347         private boolean isRequestedChunk(HttpServletRequest request) {
348                 return null != request.getHeader(TRANSFER_ENCODING) &&
349                         request.getHeader(TRANSFER_ENCODING).contains("chunked");
350         }
351
352         private String getMediaType(HttpServletRequest request) {
353                 String mediaType = request.getContentType();
354                 if (mediaType == null || mediaType.length() == 0) {
355                         return MimeTypes.kAppGenericBinary;
356                 }
357                 return mediaType.replace("; charset=UTF-8", "").trim();
358         }
359
360         private boolean isTransactionIdRequired() {
361                 String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd");
362                 return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true");
363         }
364
365         /**
366          * 
367          * @param ctx
368          * @param topic
369          * @param msg
370          * @param defaultPartition
371          * @param chunked
372          * @param mediaType
373          * @throws ConfigDbException
374          * @throws AccessDeniedException
375          * @throws TopicExistsException
376          * @throws CambriaApiException
377          * @throws IOException
378          */
379         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
380                         String mediaType)
381                         throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
382                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
383                 // setup the event set
384                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
385
386                 // start processing, building a batch to push to the backend
387                 final long startMs = System.currentTimeMillis();
388                 long count = 0;
389                 long maxEventBatch = 1024L* 16;
390                 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
391                 if (null != batchlen && !batchlen.isEmpty())
392                         maxEventBatch = Long.parseLong(batchlen);
393                 // long maxEventBatch =
394                 
395                 final LinkedList<Publisher.message> batch = new LinkedList<>();
396                 // final ArrayList<KeyedMessage<String, String>> kms = new
397         
398                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
399                 try {
400                         // for each message...
401                         Publisher.message m = null;
402                         while ((m = events.next()) != null) {
403                                 // add the message to the batch
404                                 batch.add(m);
405                                 // final KeyedMessage<String, String> data = new
406                                 // KeyedMessage<String, String>(topic, m.getKey(),
407                         
408                                 // kms.add(data);
409                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
410                                                 m.getMessage());
411
412                                 pms.add(data);
413                                 // check if the batch is full
414                                 final int sizeNow = batch.size();
415                                 if (sizeNow > maxEventBatch) {
416                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
417                                 
418                                         // kms.clear();
419                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
420                                         pms.clear();
421                                         batch.clear();
422                                         metricsSet.publishTick(sizeNow);
423                                         count += sizeNow;
424                                 }
425                         }
426
427                         // send the pending batch
428                         final int sizeNow = batch.size();
429                         if (sizeNow > 0) {
430                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
431                         
432                                 // kms.clear();
433                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
434                                 pms.clear();
435                                 batch.clear();
436                                 metricsSet.publishTick(sizeNow);
437                                 count += sizeNow;
438                         }
439
440                         final long endMs = System.currentTimeMillis();
441                         final long totalMs = endMs - startMs;
442
443                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
444                                         + ctx.getRequest().getRemoteHost());
445
446                         // build a responseP
447                         final JSONObject response = new JSONObject();
448                         response.put("count", count);
449                         response.put("serverTimeMs", totalMs);
450                         respondOk(ctx, response);
451
452                 } catch (Exception excp) {
453                         int status = HttpStatus.SC_NOT_FOUND;
454                         String errorMsg = null;
455                         if (excp instanceof CambriaApiException) {
456                                 status = ((CambriaApiException) excp).getStatus();
457                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
458                                 JSONObject errObject = new JSONObject(jsonTokener);
459                                 errorMsg = (String) errObject.get("message");
460
461                         }
462                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
463                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
464                                                         + "." + errorMsg,
465                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
466                                         null);
467                         LOG.info(errRes.toString());
468                         throw new CambriaApiException(errRes);
469
470                 }
471         }
472
473         /**
474          * 
475          * @param ctx
476          * @param inputStream
477          * @param topic
478          * @param partitionKey
479          * @param requestTime
480          * @param chunked
481          * @param mediaType
482          * @throws ConfigDbException
483          * @throws AccessDeniedException
484          * @throws TopicExistsException
485          * @throws IOException
486          * @throws CambriaApiException
487          */
488         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
489                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
490                         throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
491
492                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
493
494                 // setup the event set
495                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
496
497                 // start processing, building a batch to push to the backend
498                 final long startMs = System.currentTimeMillis();
499                 long count = 0;
500                 long maxEventBatch = 1024L * 16;
501                 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
502                 if (null != evenlen && !evenlen.isEmpty())
503                         maxEventBatch = Long.parseLong(evenlen);
504                 // final long maxEventBatch =
505                 
506                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
507                 // final ArrayList<KeyedMessage<String, String>> kms = new
508                 
509                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
510                 Publisher.message m = null;
511                 int messageSequence = 1;
512                 Long batchId = 1L;
513                 final boolean transactionEnabled = true;
514                 int publishBatchCount = 0;
515                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
516
517                 // LOG.warn("Batch Start Id: " +
518         
519                 try {
520                         // for each message...
521                         batchId = DMaaPContext.getBatchID();
522
523                         String responseTransactionId = null;
524
525                         while ((m = events.next()) != null) {
526
527                                 // LOG.warn("Batch Start Id: " +
528                                 
529
530                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
531                                                 transactionEnabled);
532                                 messageSequence++;
533
534                         
535                                 batch.add(m);
536
537                                 responseTransactionId = m.getLogDetails().getTransactionId();
538
539                                 //JSONObject jsonObject = new JSONObject();
540                                 //jsonObject.put("msgWrapMR", m.getMessage());
541                                 //jsonObject.put("transactionId", responseTransactionId);
542                                 // final KeyedMessage<String, String> data = new
543                                 // KeyedMessage<String, String>(topic, m.getKey(),
544                         
545                                 
546                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
547                                                 m.getMessage());
548
549                                 pms.add(data);
550                                 // check if the batch is full
551                                 final int sizeNow = batch.size();
552                                 if (sizeNow >= maxEventBatch) {
553                                         String startTime = sdf.format(new Date());
554                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
555                                                         + batchId + "]");
556                                         try {
557                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
558                                         
559                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
560                                                 // transactionLogs(batch);
561                                                 for (message msg : batch) {
562                                                         LogDetails logDetails = msg.getLogDetails();
563                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
564                                                 }
565                                         } catch (Exception excp) {
566
567                                                 int status = HttpStatus.SC_NOT_FOUND;
568                                                 String errorMsg = null;
569                                                 if (excp instanceof CambriaApiException) {
570                                                         status = ((CambriaApiException) excp).getStatus();
571                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
572                                                         JSONObject errObject = new JSONObject(jsonTokener);
573                                                         errorMsg = (String) errObject.get("message");
574                                                 }
575                                                 ErrorResponse errRes = new ErrorResponse(status,
576                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
577                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
578                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
579                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
580                                                                 ctx.getRequest().getRemoteHost(), null, null);
581                                                 LOG.info(errRes.toString());
582                                                 throw new CambriaApiException(errRes);
583                                         }
584                                         pms.clear();
585                                         batch.clear();
586                                         metricsSet.publishTick(sizeNow);
587                                         publishBatchCount = sizeNow;
588                                         count += sizeNow;
589                                         
590                                         String endTime = sdf.format(new Date());
591                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
592                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
593                                                         + ",Batch End Time=" + endTime + "]");
594                                         batchId = DMaaPContext.getBatchID();
595                                 }
596                         }
597
598                         // send the pending batch
599                         final int sizeNow = batch.size();
600                         if (sizeNow > 0) {
601                                 String startTime = sdf.format(new Date());
602                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
603                                                 + batchId + "]");
604                                 try {
605                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
606                                         
607                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
608                                         
609                                         for (message msg : batch) {
610                                                 LogDetails logDetails = msg.getLogDetails();
611                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
612                                         }
613                                 } catch (Exception excp) {
614                                         int status = HttpStatus.SC_NOT_FOUND;
615                                         String errorMsg = null;
616                                         if (excp instanceof CambriaApiException) {
617                                                 status = ((CambriaApiException) excp).getStatus();
618                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
619                                                 JSONObject errObject = new JSONObject(jsonTokener);
620                                                 errorMsg = (String) errObject.get("message");
621                                         }
622
623                                         ErrorResponse errRes = new ErrorResponse(status,
624                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
625                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
626                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
627                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
628                                                         ctx.getRequest().getRemoteHost(), null, null);
629                                         LOG.info(errRes.toString());
630                                         throw new CambriaApiException(errRes);
631                                 }
632                                 pms.clear();
633                                 metricsSet.publishTick(sizeNow);
634                                 count += sizeNow;
635                         
636                                 String endTime = sdf.format(new Date());
637                                 publishBatchCount = sizeNow;
638                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
639                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
640                                                 + endTime + "]");
641                         }
642
643                         final long endMs = System.currentTimeMillis();
644                         final long totalMs = endMs - startMs;
645
646                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
647
648                         if (null != responseTransactionId) {
649                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
650                         }
651
652                         // build a response
653                         final JSONObject response = new JSONObject();
654                         response.put("count", count);
655                         response.put("transactionId", responseTransactionId);
656                         response.put("serverTimeMs", totalMs);
657                         respondOk(ctx, response);
658
659                 } catch (Exception excp) {
660                         int status = HttpStatus.SC_NOT_FOUND;
661                         String errorMsg = null;
662                         if (excp instanceof CambriaApiException) {
663                                 status = ((CambriaApiException) excp).getStatus();
664                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
665                                 JSONObject errObject = new JSONObject(jsonTokener);
666                                 errorMsg = (String) errObject.get("message");
667                         }
668
669                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
670                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
671                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
672                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
673                                         ctx.getRequest().getRemoteHost(), null, null);
674                         LOG.info(errRes.toString());
675                         throw new CambriaApiException(errRes);
676                 }
677         }
678
679         /**
680          * 
681          * @param msg
682          * @param topic
683          * @param request
684          * @param messageCreationTime
685          * @param messageSequence
686          * @param batchId
687          * @param transactionEnabled
688          */
689         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
690                         final String messageCreationTime, final int messageSequence, final Long batchId,
691                         final boolean transactionEnabled) {
692                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
693                                 transactionEnabled);
694                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
695                 msg.setTransactionEnabled(transactionEnabled);
696                 msg.setLogDetails(logDetails);
697         }
698
699         void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
700                 DMaaPResponseBuilder.respondOk(ctx, response);
701         }
702
703         /**
704          * 
705          * @author anowarul.islam
706          *
707          */
708         private static class LogWrap {
709                 private final String fId;
710
711                 /**
712                  * constructor initialization
713                  * 
714                  * @param topic
715                  * @param cgroup
716                  * @param cid
717                  */
718                 public LogWrap(String topic, String cgroup, String cid) {
719                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
720                 }
721
722                 /**
723                  * 
724                  * @param msg
725                  */
726                 public void info(String msg) {
727                         LOG.info(fId + msg);
728                 }
729
730                 /**
731                  * 
732                  * @param msg
733                  * @param t
734                  */
735                 public void warn(String msg, Exception t) {
736                         LOG.warn(fId + msg, t);
737                 }
738
739         }
740
741         public boolean isTransEnabled() {
742                 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
743                 boolean istransidreqd = false;
744                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
745                         istransidreqd = true;
746                 }
747
748                 return istransidreqd;
749
750         }
751
752         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
753                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
754                 LogDetails logDetails = new LogDetails();
755                 logDetails.setTopicId(topicName);
756                 logDetails.setMessageTimestamp(messageTimestamp);
757                 logDetails.setPublisherId(Utils.getUserApiKey(request));
758                 logDetails.setPublisherIp(request.getRemoteHost());
759                 logDetails.setMessageBatchId(batchId);
760                 logDetails.setMessageSequence(String.valueOf(messageSequence));
761                 logDetails.setTransactionEnabled(transactionEnabled);
762                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
763                 logDetails.setServerIp(request.getLocalAddr());
764                 return logDetails;
765         }
766
767
768 }