sonar critical for errorhandling
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.service.impl;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.LinkedList;
30
31 import javax.servlet.http.HttpServletRequest;
32 import javax.ws.rs.core.MediaType;
33
34 import org.apache.http.HttpStatus;
35
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.stereotype.Service;
42
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import com.att.nsa.cambria.CambriaApiException;
45 import com.att.nsa.cambria.backends.Consumer;
46 import com.att.nsa.cambria.backends.ConsumerFactory;
47 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
48 import com.att.nsa.cambria.backends.MetricsSet;
49 import com.att.nsa.cambria.backends.Publisher;
50 import com.att.nsa.cambria.backends.Publisher.message;
51 import com.att.nsa.cambria.beans.DMaaPCambriaLimiter;
52 import com.att.nsa.cambria.beans.DMaaPContext;
53 import com.att.nsa.cambria.beans.LogDetails;
54 import com.att.nsa.cambria.constants.CambriaConstants;
55 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
56 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
57 import com.att.nsa.cambria.exception.DMaaPResponseCode;
58 import com.att.nsa.cambria.exception.ErrorResponse;
59 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
60 import com.att.nsa.cambria.metabroker.Topic;
61 import com.att.nsa.cambria.resources.CambriaEventSet;
62 import com.att.nsa.cambria.resources.CambriaOutboundEventStream;
63 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
64 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
65 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
66 import com.att.nsa.cambria.service.EventsService;
67 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
68 import com.att.nsa.cambria.utils.Utils;
69 import com.att.nsa.configs.ConfigDbException;
70 import com.att.nsa.drumlin.service.standards.MimeTypes;
71 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
72 import com.att.nsa.security.NsaApiKey;
73 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
74 import com.att.nsa.util.rrConvertor;
75
76 import kafka.producer.KeyedMessage;
77
78 /**
79  * This class provides the functinality to publish and subscribe message to
80  * kafka
81  * 
82  * @author author
83  *
84  */
85 @Service
86 public class EventsServiceImpl implements EventsService {
87         //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class);
88         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89
90         private static final String BATCH_LENGTH = "event.batch.length";
91         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92         @Autowired
93         private DMaaPErrorMessages errorMessages;
94
95         //@Value("${metrics.send.cambria.topic}")
96         //private String metricsTopic;
97         
98         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
99                 this.errorMessages = errorMessages;
100         }
101
102         /**
103          * @param ctx
104          * @param topic
105          * @param consumerGroup
106          * @param clientId
107          * @throws ConfigDbException,
108          *             TopicExistsException, AccessDeniedException,
109          *             UnavailableException, CambriaApiException, IOException
110          * 
111          * 
112          */
113         @Override
114         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
115                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
116                         CambriaApiException, IOException,DMaaPAccessDeniedException {
117                 final long startTime = System.currentTimeMillis();
118                 final HttpServletRequest req = ctx.getRequest();
119
120                 if(clientId == null)
121                         throw new NullPointerException();
122                 
123                 boolean isAAFTopic=false;
124                 // was this host blacklisted?
125                 final String remoteAddr = Utils.getRemoteAddress(ctx);;
126                 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
127                 {
128                         
129                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
130                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
131                                 "] is blacklisted. Please contact the cluster management team."
132                                         ,null,Utils.getFormattedDate(new Date()),topic,
133                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
134                                         null,null);
135                         LOG.info(errRes.toString());
136                         throw new CambriaApiException(errRes);
137                 }
138                 
139                 
140                 int limit = CambriaConstants.kNoLimit;
141                 if (req.getParameter("limit") != null) {
142                         limit = Integer.parseInt(req.getParameter("limit"));
143                 }
144
145                 int timeoutMs= CambriaConstants.kNoTimeout;
146                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout");
147                 if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS);
148                 //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout);
149                 if (req.getParameter("timeout") != null) {
150                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
151                 }
152
153                 // By default no filter is applied if filter is not passed as a
154                 // parameter in the request URI
155                 String topicFilter = CambriaConstants.kNoFilter;
156                 if (null != req.getParameter("filter")) {
157                         topicFilter = req.getParameter("filter");
158                 }
159                 // pretty to print the messaages in new line
160                 String prettyval="0";
161                 String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty");
162                 if (null!=strPretty)prettyval=strPretty;
163                 
164                 String metaval="0";
165                 String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta");
166                 if (null!=strmeta)metaval=strmeta;
167                 
168                 final boolean pretty = rrConvertor
169                                 .convertToBooleanBroad(prettyval);
170                 // withMeta to print offset along with message
171                 final boolean withMeta = rrConvertor
172                                 .convertToBooleanBroad(metaval);
173                 
174                 
175                 /*final boolean pretty = rrConvertor
176                                 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0"));
177                 // withMeta to print offset along with message
178                 final boolean withMeta = rrConvertor
179                                 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0"));
180 */
181                 final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId);
182                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter);
183
184                 // is this user allowed to read this topic?
185                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
186                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
187                 
188                 if (metatopic == null) {
189                         // no such topic.
190                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
191                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), 
192                                         errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null,
193                                         clientId,ctx.getRequest().getRemoteHost());
194                         LOG.info(errRes.toString());
195                         throw new CambriaApiException(errRes);
196                 }
197                 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
198                 if (null==metricTopicname)
199                  metricTopicname="msgrtr.apinode.metrics.dmaap";
200                 
201                  if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname))
202                 {       
203                         if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){
204                         // check permissions
205                         metatopic.checkUserRead(user);  
206                         }
207                 }
208                 // if headers are not provided then user will be null
209                  if(user == null && null!=ctx.getRequest().getHeader("Authorization"))
210                 {
211                         // the topic name will be sent by the client
212 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
213                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
214                         String permission = aaf.aafPermissionString(topic, "sub");
215                         if(!aaf.aafAuthentication(ctx.getRequest(), permission))
216                         {
217                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
218                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
219                                                 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null,
220                                                 clientId,ctx.getRequest().getRemoteHost());
221                                 LOG.info(errRes.toString());
222                                 throw new DMaaPAccessDeniedException(errRes);
223                                 
224                         }
225                         isAAFTopic = true;
226                 }
227                 Consumer c = null;
228                 try {
229                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
230
231                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
232                         rl.onCall(topic, consumerGroup, clientId);
233
234                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
235
236                 /*      final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c,
237                                         ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter)
238                                                         .pretty(pretty).withMeta(withMeta)
239                                                         // .atOffset(topicOffset)
240                                                         .build();*/
241                         final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter)
242                                         .pretty(pretty).withMeta(withMeta).build();
243                         coes.setDmaapContext(ctx);
244                         coes.setTopic(metatopic);
245                         if( isTransEnabled() || isAAFTopic ){
246                                 coes.setTransEnabled(true);
247                         }else{
248                         coes.setTransEnabled(false);
249                         }
250                         coes.setTopicStyle(isAAFTopic);
251             
252                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
253
254                         DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
255
256                         // No IOException thrown during respondOkWithStream, so commit the
257                         // new offsets to all the brokers
258                         c.commitOffsets();
259                         final int sent = coes.getSentCount();
260
261                         metricsSet.consumeTick(sent);
262                         rl.onSend(topic, consumerGroup, clientId, sent);
263
264                         final long elapsedMs = System.currentTimeMillis() - startTime;
265                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset());
266
267                 } catch (UnavailableException excp) {
268                         logger.warn(excp.getMessage(), excp);
269                         
270                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, 
271                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), 
272                                         errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
273                                         clientId,ctx.getRequest().getRemoteHost());
274                         LOG.info(errRes.toString());
275                         throw new CambriaApiException(errRes);
276                         
277                 } catch (CambriaApiException excp) {
278                         logger.warn(excp.getMessage(), excp);
279                         throw excp;
280                 } catch (Exception excp) {
281                         logger.warn("Couldn't respond to client, closing cambria consumer", excp);
282                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
283                         
284                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, 
285                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), 
286                                         "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
287                                         clientId,ctx.getRequest().getRemoteHost());
288                         LOG.info(errRes.toString());
289                         throw new CambriaApiException(errRes);
290                 } finally {
291                         // If no cache, close the consumer now that we're done with it.
292                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
293                         String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache);
294                         if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
295                         //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,     ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
296                         if (!kSetting_EnableCache && (c != null)) {
297                                 c.close();
298
299                         }
300                 }
301         }
302
303         /**
304          * @throws missingReqdSetting 
305          * 
306          */
307         @Override
308         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
309                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
310                         CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException {
311
312                 // is this user allowed to write to this topic?
313                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
314                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
315                 boolean isAAFTopic=false;
316                 
317                         // was this host blacklisted?
318                                 final String remoteAddr = Utils.getRemoteAddress(ctx);
319                                 
320                                 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
321                                 {
322                                         
323                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
324                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
325                                                 "] is blacklisted. Please contact the cluster management team."
326                                                         ,null,Utils.getFormattedDate(new Date()),topic,
327                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
328                                                         null,null);
329                                         LOG.info(errRes.toString());
330                                         throw new CambriaApiException(errRes);
331                                 }
332                                 
333                                   String topicNameStd = null;
334                        
335                        //       topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
336                         topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
337                         String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
338                          if (null==metricTopicname)
339                                  metricTopicname="msgrtr.apinode.metrics.dmaap";
340                         boolean topicNameEnforced=false;
341                         if (null != topicNameStd && topic.startsWith(topicNameStd)  )
342                         {
343                                 topicNameEnforced = true;
344                         }
345                 
346                        //Here check if the user has rights to publish on the topic
347                        //( This will be called when no auth is added or when UEB API Key Authentication is used)
348                        //checkUserWrite(user) method will throw an error when there is no Auth header added or when the
349                        //user has no publish rights
350                         
351                                 if(null != metatopic &&  null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) 
352                                 {
353                                         metatopic.checkUserWrite(user);
354                                 }
355
356         
357                                 
358                                 // if headers are not provided then user will be null
359                  if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)))
360                 {
361                         // the topic name will be sent by the client
362                                                 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
363                                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
364                                                 String permission = aaf.aafPermissionString(topic, "pub");
365                                                 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
366                                                 {
367                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
368                                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
369                                                                         errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,
370                                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
371                                                                         null,null);
372                                                         LOG.info(errRes.toString());
373                                                         throw new DMaaPAccessDeniedException(errRes);
374                                                 }
375                                                 isAAFTopic=true;
376                 }       
377                  
378                 final HttpServletRequest req = ctx.getRequest();
379
380                 // check for chunked input
381                 boolean chunked = false;
382                 if (null != req.getHeader(TRANSFER_ENCODING)) {
383                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
384                 }
385                 // get the media type, or set it to a generic value if it wasn't
386                 // provided
387                 String mediaType = req.getContentType();
388                 if (mediaType == null || mediaType.length() == 0) {
389                         mediaType = MimeTypes.kAppGenericBinary;
390                 }
391
392                 if (mediaType.contains("charset=UTF-8")) {
393                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
394                 }
395                 
396                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
397                 boolean istransidreqd=false;
398                 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){
399                         istransidreqd = true; 
400                 }
401                 
402                 if (isAAFTopic || istransidreqd ) {
403                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
404                 }
405                 else
406                 {
407                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
408                 }
409                         
410
411         }
412
413         /**
414          * 
415          * @param ctx
416          * @param topic
417          * @param msg
418          * @param defaultPartition
419          * @param chunked
420          * @param mediaType
421          * @throws ConfigDbException
422          * @throws AccessDeniedException
423          * @throws TopicExistsException
424          * @throws CambriaApiException
425          * @throws IOException
426          */
427         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition,
428                         boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
429                                         CambriaApiException, IOException {
430                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
431
432                 // setup the event set
433                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
434
435                 // start processing, building a batch to push to the backend
436                 final long startMs = System.currentTimeMillis();
437                 long count = 0;
438                 
439                 long maxEventBatch=1024 * 16;
440                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
441                 if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen);
442                 
443                 // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
444                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
445                 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
446
447                 try {
448                         // for each message...
449                         Publisher.message m = null;
450                         while ((m = events.next()) != null) {
451                                 // add the message to the batch
452                                 batch.add(m);
453                                 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
454                                                 m.getMessage());
455                                 kms.add(data);
456                                 // check if the batch is full
457                                 final int sizeNow = batch.size();
458                                 if (sizeNow > maxEventBatch) {
459                                         ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
460                                         kms.clear();
461                                         batch.clear();
462                                         metricsSet.publishTick(sizeNow);
463                                         count += sizeNow;
464                                 }
465                         }
466
467                         // send the pending batch
468                         final int sizeNow = batch.size();
469                         if (sizeNow > 0) {
470                                 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
471                                 kms.clear();
472                                 batch.clear();
473                                 metricsSet.publishTick(sizeNow);
474                                 count += sizeNow;
475                         }
476
477                         final long endMs = System.currentTimeMillis();
478                         final long totalMs = endMs - startMs;
479
480                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
481
482                         // build a responseP
483                         final JSONObject response = new JSONObject();
484                         response.put("count", count);
485                         response.put("serverTimeMs", totalMs);
486                         DMaaPResponseBuilder.respondOk(ctx, response);
487
488                 } catch (Exception excp) {
489                         int status = HttpStatus.SC_NOT_FOUND;
490                         String errorMsg=null;
491                         if(excp instanceof CambriaApiException) {
492                                  status = ((CambriaApiException) excp).getStatus();
493                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
494                                  JSONObject errObject = new JSONObject(jsonTokener);
495                                  errorMsg = (String) errObject.get("message");
496                                         
497                         }
498                         ErrorResponse errRes = new ErrorResponse(status, 
499                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
500                                         errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
501                                         null,ctx.getRequest().getRemoteHost(),
502                                         null,null);
503                         LOG.info(errRes.toString());
504                         throw new CambriaApiException(errRes);
505                         
506                         
507                 }
508         }
509
510         /**
511          * 
512          * @param ctx
513          * @param inputStream
514          * @param topic
515          * @param partitionKey
516          * @param requestTime
517          * @param chunked
518          * @param mediaType
519          * @throws ConfigDbException
520          * @throws AccessDeniedException
521          * @throws TopicExistsException
522          * @throws IOException
523          * @throws CambriaApiException
524          */
525         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
526                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
527                                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
528                                         CambriaApiException {
529
530                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
531
532                 // setup the event set
533                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
534
535                 // start processing, building a batch to push to the backend
536                 final long startMs = System.currentTimeMillis();
537                 long count = 0;
538                 long maxEventBatch =  (long)1024 * 16;
539                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
540                         if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen);
541                 //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
542                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
543                 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
544
545                 Publisher.message m = null;
546                 int messageSequence = 1;
547                 Long batchId = 1L;
548                 final boolean transactionEnabled = true;
549                 int publishBatchCount=0;
550                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
551
552                 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
553                 try {
554                         // for each message...
555                         batchId=DMaaPContext.getBatchID();
556                         
557                         String responseTransactionId = null;
558                         
559                         while ((m = events.next()) != null) {
560                         
561                                 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
562                                 
563
564                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
565                                                 transactionEnabled);
566                                 messageSequence++;
567
568                                 // add the message to the batch
569                                 batch.add(m);
570                                 
571                                 responseTransactionId = m.getLogDetails().getTransactionId();
572                                 
573                                 JSONObject jsonObject = new JSONObject();
574                                 jsonObject.put("message", m.getMessage());
575                                 jsonObject.put("transactionId", responseTransactionId);
576                                 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
577                                                 jsonObject.toString());
578                                 kms.add(data);
579
580                                 // check if the batch is full
581                                 final int sizeNow = batch.size();
582                                 if (sizeNow >= maxEventBatch) {
583                                         String startTime = sdf.format(new Date());
584                                         LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
585                                         try {
586                                                 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
587                                                 //transactionLogs(batch);
588                                                 for (message msg : batch) {
589                                                         LogDetails logDetails = msg.getLogDetails();
590                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
591                                                 }
592                                         } catch (Exception excp) {
593                                                 
594                                                 int status = HttpStatus.SC_NOT_FOUND;
595                                                 String errorMsg=null;
596                                                 if(excp instanceof CambriaApiException) {
597                                                          status = ((CambriaApiException) excp).getStatus();
598                                                          JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
599                                                          JSONObject errObject = new JSONObject(jsonTokener);
600                                                          errorMsg = (String) errObject.get("message");
601                                                 }
602                                                 ErrorResponse errRes = new ErrorResponse(status, 
603                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
604                                                                 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,
605                                                                 null,Utils.getFormattedDate(new Date()),topic,
606                                                                 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
607                                                                 null,null);
608                                                 LOG.info(errRes.toString());
609                                                 throw new CambriaApiException(errRes);
610                                         }
611                                         kms.clear();
612                                         batch.clear();
613                                         metricsSet.publishTick(sizeNow);
614                                         publishBatchCount=sizeNow;
615                                         count += sizeNow;
616                                         //batchId++;
617                                         String endTime = sdf.format(new Date());
618                                         LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
619                                                         + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
620                                         batchId=DMaaPContext.getBatchID();
621                                 }
622                         }
623
624                         // send the pending batch
625                         final int sizeNow = batch.size();
626                         if (sizeNow > 0) {
627                                 String startTime = sdf.format(new Date());
628                                 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
629                                 try {
630                                         ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
631                                         //transactionLogs(batch);
632                                         for (message msg : batch) {
633                                                 LogDetails logDetails = msg.getLogDetails();
634                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
635                                         }
636                                 } catch (Exception excp) {
637                                         int status = HttpStatus.SC_NOT_FOUND;
638                                         String errorMsg=null;
639                                         if(excp instanceof CambriaApiException) {
640                                                  status = ((CambriaApiException) excp).getStatus();
641                                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
642                                                  JSONObject errObject = new JSONObject(jsonTokener);
643                                                  errorMsg = (String) errObject.get("message");
644                                         }
645                                         
646                                         ErrorResponse errRes = new ErrorResponse(status, 
647                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
648                                                         "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg,
649                                                         null,Utils.getFormattedDate(new Date()),topic,
650                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
651                                                         null,null);
652                                         LOG.info(errRes.toString());
653                                         throw new CambriaApiException(errRes);
654                                 }
655                                 kms.clear();
656                                 metricsSet.publishTick(sizeNow);
657                                 count += sizeNow;
658                                 //batchId++;
659                                 String endTime = sdf.format(new Date());
660                                 publishBatchCount=sizeNow;
661                                 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
662                                                 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
663                         }
664
665                         final long endMs = System.currentTimeMillis();
666                         final long totalMs = endMs - startMs;
667
668                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
669
670                         if (null != responseTransactionId) {
671                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
672                         }
673                         
674                         // build a response
675                         final JSONObject response = new JSONObject();
676                         response.put("count", count);
677                         response.put("serverTimeMs", totalMs);
678                         DMaaPResponseBuilder.respondOk(ctx, response);
679                         
680                 } catch (Exception excp) {
681                         int status = HttpStatus.SC_NOT_FOUND;
682                         String errorMsg=null;
683                         if(excp instanceof CambriaApiException) {
684                                  status = ((CambriaApiException) excp).getStatus();
685                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
686                                  JSONObject errObject = new JSONObject(jsonTokener);
687                                  errorMsg = (String) errObject.get("message");
688                         }
689                         
690                         ErrorResponse errRes = new ErrorResponse(
691                                         status, 
692                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
693                                         "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
694                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
695                                         null,null);
696                         LOG.info(errRes.toString());
697                         throw new CambriaApiException(errRes);
698                 }
699         }
700
701         /**
702          * 
703          * @param msg
704          * @param topic
705          * @param request
706          * @param messageCreationTime
707          * @param messageSequence
708          * @param batchId
709          * @param transactionEnabled
710          */
711         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
712                         final String messageCreationTime, final int messageSequence, final Long batchId,
713                         final boolean transactionEnabled) {
714                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
715                                 transactionEnabled);
716                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
717                 msg.setTransactionEnabled(transactionEnabled);
718                 msg.setLogDetails(logDetails);
719         }
720
721
722
723         /**
724          * 
725          * @author author
726          *
727          */
728         private static class LogWrap {
729                 private final String fId;
730
731                 /**
732                  * constructor initialization
733                  * 
734                  * @param topic
735                  * @param cgroup
736                  * @param cid
737                  */
738                 public LogWrap(String topic, String cgroup, String cid) {
739                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
740                 }
741
742                 /**
743                  * 
744                  * @param msg
745                  */
746                 public void info(String msg) {
747                         LOG.info(fId + msg);
748                 }
749
750                 /**
751                  * 
752                  * @param msg
753                  * @param t
754                  */
755                 public void warn(String msg, Exception t) {
756                         LOG.warn(fId + msg, t);
757                 }
758
759         }
760         
761         private boolean isTransEnabled() {
762                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
763                 boolean istransidreqd=false;
764                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){
765                         istransidreqd = true; 
766                 }
767                 
768                 return istransidreqd;
769
770         }
771
772         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
773                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
774                 LogDetails logDetails = new LogDetails();
775                 logDetails.setTopicId(topicName);
776                 logDetails.setMessageTimestamp(messageTimestamp);
777                 logDetails.setPublisherId(Utils.getUserApiKey(request));
778                 logDetails.setPublisherIp(request.getRemoteHost());
779                 logDetails.setMessageBatchId(batchId);
780                 logDetails.setMessageSequence(String.valueOf(messageSequence));
781                 logDetails.setTransactionEnabled(transactionEnabled);
782                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
783                 logDetails.setServerIp(request.getLocalAddr());
784                 return logDetails;
785         }
786
787         /*public String getMetricsTopic() {
788                 return metricsTopic;
789         }
790
791         public void setMetricsTopic(String metricsTopic) {
792                 this.metricsTopic = metricsTopic;
793         }*/
794
795 }