dce5a195fed3f626dbc3fbc9c232f19c68ed8e19
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / messagerouter / msgrtr / nsa / cambria / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.service.impl;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.LinkedList;
30
31 import javax.servlet.http.HttpServletRequest;
32 import javax.ws.rs.core.MediaType;
33
34 import org.apache.http.HttpStatus;
35
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.CambriaApiException;
41 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Consumer;
42 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.ConsumerFactory;
43 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.MetricsSet;
44 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Publisher;
45 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.ConsumerFactory.UnavailableException;
46 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.backends.Publisher.message;
47 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.DMaaPCambriaLimiter;
48 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.DMaaPContext;
49 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.beans.LogDetails;
50 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.constants.CambriaConstants;
51 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.exception.DMaaPAccessDeniedException;
52 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.exception.DMaaPErrorMessages;
53 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.exception.DMaaPResponseCode;
54 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.exception.ErrorResponse;
55 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Topic;
56 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.metabroker.Broker.TopicExistsException;
57 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.CambriaEventSet;
58 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.resources.CambriaOutboundEventStream;
59 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.security.DMaaPAAFAuthenticator;
60 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
61 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.security.DMaaPAuthenticatorImpl;
62 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.service.EventsService;
63 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.DMaaPResponseBuilder;
64 import org.onap.dmaap.messagerouter.msgrtr.nsa.cambria.utils.Utils;
65 import org.springframework.beans.factory.annotation.Autowired;
66 import org.springframework.stereotype.Service;
67
68 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
69 import com.att.nsa.configs.ConfigDbException;
70 import com.att.nsa.drumlin.service.standards.MimeTypes;
71 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
72 import com.att.nsa.security.NsaApiKey;
73 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
74 import com.att.nsa.util.rrConvertor;
75
76 import kafka.producer.KeyedMessage;
77
78 /**
79  * This class provides the functinality to publish and subscribe message to
80  * kafka
81  * 
82  * @author author
83  *
84  */
85 @Service
86 public class EventsServiceImpl implements EventsService {
87         //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class);
88         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89
90         private static final String BATCH_LENGTH = "event.batch.length";
91         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92         @Autowired
93         private DMaaPErrorMessages errorMessages;
94
95         //@Value("${metrics.send.cambria.topic}")
96         //private String metricsTopic;
97         
98         /**
99          * @param ctx
100          * @param topic
101          * @param consumerGroup
102          * @param clientId
103          * @throws ConfigDbException,
104          *             TopicExistsException, AccessDeniedException,
105          *             UnavailableException, CambriaApiException, IOException
106          * 
107          * 
108          */
109         @Override
110         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
111                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
112                         CambriaApiException, IOException,DMaaPAccessDeniedException {
113                 final long startTime = System.currentTimeMillis();
114                 final HttpServletRequest req = ctx.getRequest();
115
116                 boolean isAAFTopic=false;
117                 // was this host blacklisted?
118                 final String remoteAddr = Utils.getRemoteAddress(ctx);;
119                 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
120                 {
121                         
122                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
123                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
124                                 "] is blacklisted. Please contact the cluster management team."
125                                         ,null,Utils.getFormattedDate(new Date()),topic,
126                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
127                                         null,null);
128                         LOG.info(errRes.toString());
129                         throw new CambriaApiException(errRes);
130                 }
131                 
132                 
133                 int limit = CambriaConstants.kNoLimit;
134                 if (req.getParameter("limit") != null) {
135                         limit = Integer.parseInt(req.getParameter("limit"));
136                 }
137
138                 int timeoutMs= CambriaConstants.kNoTimeout;
139                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout");
140                 if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS);
141                 //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout);
142                 if (req.getParameter("timeout") != null) {
143                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
144                 }
145
146                 // By default no filter is applied if filter is not passed as a
147                 // parameter in the request URI
148                 String topicFilter = CambriaConstants.kNoFilter;
149                 if (null != req.getParameter("filter")) {
150                         topicFilter = req.getParameter("filter");
151                 }
152                 // pretty to print the messaages in new line
153                 String prettyval="0";
154                 String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty");
155                 if (null!=strPretty)prettyval=strPretty;
156                 
157                 String metaval="0";
158                 String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta");
159                 if (null!=strmeta)metaval=strmeta;
160                 
161                 final boolean pretty = rrConvertor
162                                 .convertToBooleanBroad(prettyval);
163                 // withMeta to print offset along with message
164                 final boolean withMeta = rrConvertor
165                                 .convertToBooleanBroad(metaval);
166                 
167                 
168                 /*final boolean pretty = rrConvertor
169                                 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0"));
170                 // withMeta to print offset along with message
171                 final boolean withMeta = rrConvertor
172                                 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0"));
173 */
174                 final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId);
175                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter);
176
177                 // is this user allowed to read this topic?
178                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
179                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
180                 
181                 if (metatopic == null) {
182                         // no such topic.
183                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
184                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), 
185                                         errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null,
186                                         clientId,ctx.getRequest().getRemoteHost());
187                         LOG.info(errRes.toString());
188                         throw new CambriaApiException(errRes);
189                 }
190                 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
191                 if (null==metricTopicname)
192                  metricTopicname="msgrtr.apinode.metrics.dmaap";
193                 
194                  if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname))
195                 {       
196                         if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){
197                         // check permissions
198                         metatopic.checkUserRead(user);  
199                         }
200                 }
201                 // if headers are not provided then user will be null
202                  if(user == null && null!=ctx.getRequest().getHeader("Authorization"))
203                 {
204                         // the topic name will be sent by the client
205 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
206                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
207                         String permission = aaf.aafPermissionString(topic, "sub");
208                         if(!aaf.aafAuthentication(ctx.getRequest(), permission))
209                         {
210                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
211                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
212                                                 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null,
213                                                 clientId,ctx.getRequest().getRemoteHost());
214                                 LOG.info(errRes.toString());
215                                 throw new DMaaPAccessDeniedException(errRes);
216                                 
217                         }
218                         isAAFTopic = true;
219                 }
220                 Consumer c = null;
221                 try {
222                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
223
224                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
225                         rl.onCall(topic, consumerGroup, clientId);
226
227                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
228
229                 /*      final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c,
230                                         ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter)
231                                                         .pretty(pretty).withMeta(withMeta)
232                                                         // .atOffset(topicOffset)
233                                                         .build();*/
234                         final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter)
235                                         .pretty(pretty).withMeta(withMeta).build();
236                         coes.setDmaapContext(ctx);
237                         coes.setTopic(metatopic);
238                         if( isTransEnabled() || isAAFTopic ){
239                                 coes.setTransEnabled(true);
240                         }else{
241                         coes.setTransEnabled(false);
242                         }
243                         coes.setTopicStyle(isAAFTopic);
244             
245                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
246
247                         DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
248
249                         // No IOException thrown during respondOkWithStream, so commit the
250                         // new offsets to all the brokers
251                         c.commitOffsets();
252                         final int sent = coes.getSentCount();
253
254                         metricsSet.consumeTick(sent);
255                         rl.onSend(topic, consumerGroup, clientId, sent);
256
257                         final long elapsedMs = System.currentTimeMillis() - startTime;
258                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset());
259
260                 } catch (UnavailableException excp) {
261                         logger.warn(excp.getMessage(), excp);
262                         
263                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, 
264                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), 
265                                         errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
266                                         clientId,ctx.getRequest().getRemoteHost());
267                         LOG.info(errRes.toString());
268                         throw new CambriaApiException(errRes);
269                         
270                 } catch (CambriaApiException excp) {
271                         logger.warn(excp.getMessage(), excp);
272                         throw excp;
273                 } catch (Exception excp) {
274                         logger.warn("Couldn't respond to client, closing cambria consumer", excp);
275                         ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
276                         
277                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, 
278                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), 
279                                         "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
280                                         clientId,ctx.getRequest().getRemoteHost());
281                         LOG.info(errRes.toString());
282                         throw new CambriaApiException(errRes);
283                 } finally {
284                         // If no cache, close the consumer now that we're done with it.
285                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
286                         String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache);
287                         if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
288                         //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,     ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
289                         if (!kSetting_EnableCache && (c != null)) {
290                                 c.close();
291
292                         }
293                 }
294         }
295
296         /**
297          * @throws missingReqdSetting 
298          * 
299          */
300         @Override
301         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
302                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
303                         CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException {
304
305                 // is this user allowed to write to this topic?
306                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
307                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
308                 boolean isAAFTopic=false;
309                 
310                         // was this host blacklisted?
311                                 final String remoteAddr = Utils.getRemoteAddress(ctx);
312                                 
313                                 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
314                                 {
315                                         
316                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
317                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
318                                                 "] is blacklisted. Please contact the cluster management team."
319                                                         ,null,Utils.getFormattedDate(new Date()),topic,
320                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
321                                                         null,null);
322                                         LOG.info(errRes.toString());
323                                         throw new CambriaApiException(errRes);
324                                 }
325                                 
326                                   String topicNameStd = null;
327                        
328                        //       topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
329                         topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
330                         String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
331                          if (null==metricTopicname)
332                                  metricTopicname="msgrtr.apinode.metrics.dmaap";
333                         boolean topicNameEnforced=false;
334                         if (null != topicNameStd && topic.startsWith(topicNameStd)  )
335                         {
336                                 topicNameEnforced = true;
337                         }
338                 
339                        //Here check if the user has rights to publish on the topic
340                        //( This will be called when no auth is added or when UEB API Key Authentication is used)
341                        //checkUserWrite(user) method will throw an error when there is no Auth header added or when the
342                        //user has no publish rights
343                         
344                                 if(null != metatopic &&  null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) 
345                                 {
346                                         metatopic.checkUserWrite(user);
347                                 }
348
349         
350                                 
351                                 // if headers are not provided then user will be null
352                  if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)))
353                 {
354                         // the topic name will be sent by the client
355                                                 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
356                                                 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
357                                                 String permission = aaf.aafPermissionString(topic, "pub");
358                                                 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
359                                                 {
360                                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
361                                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
362                                                                         errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,
363                                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
364                                                                         null,null);
365                                                         LOG.info(errRes.toString());
366                                                         throw new DMaaPAccessDeniedException(errRes);
367                                                 }
368                                                 isAAFTopic=true;
369                 }       
370                  
371                 final HttpServletRequest req = ctx.getRequest();
372
373                 // check for chunked input
374                 boolean chunked = false;
375                 if (null != req.getHeader(TRANSFER_ENCODING)) {
376                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
377                 }
378                 // get the media type, or set it to a generic value if it wasn't
379                 // provided
380                 String mediaType = req.getContentType();
381                 if (mediaType == null || mediaType.length() == 0) {
382                         mediaType = MimeTypes.kAppGenericBinary;
383                 }
384
385                 if (mediaType.contains("charset=UTF-8")) {
386                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
387                 }
388                 
389                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
390                 boolean istransidreqd=false;
391                 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){
392                         istransidreqd = true; 
393                 }
394                 
395                 if (isAAFTopic || istransidreqd ) {
396                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
397                 }
398                 else
399                 {
400                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
401                 }
402                         
403
404         }
405
406         /**
407          * 
408          * @param ctx
409          * @param topic
410          * @param msg
411          * @param defaultPartition
412          * @param chunked
413          * @param mediaType
414          * @throws ConfigDbException
415          * @throws AccessDeniedException
416          * @throws TopicExistsException
417          * @throws CambriaApiException
418          * @throws IOException
419          */
420         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition,
421                         boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
422                                         CambriaApiException, IOException {
423                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
424
425                 // setup the event set
426                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
427
428                 // start processing, building a batch to push to the backend
429                 final long startMs = System.currentTimeMillis();
430                 long count = 0;
431                 
432                 long maxEventBatch=1024 * 16;
433                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
434                 if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen);
435                 
436                 // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
437                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
438                 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
439
440                 try {
441                         // for each message...
442                         Publisher.message m = null;
443                         while ((m = events.next()) != null) {
444                                 // add the message to the batch
445                                 batch.add(m);
446                                 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
447                                                 m.getMessage());
448                                 kms.add(data);
449                                 // check if the batch is full
450                                 final int sizeNow = batch.size();
451                                 if (sizeNow > maxEventBatch) {
452                                         ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
453                                         kms.clear();
454                                         batch.clear();
455                                         metricsSet.publishTick(sizeNow);
456                                         count += sizeNow;
457                                 }
458                         }
459
460                         // send the pending batch
461                         final int sizeNow = batch.size();
462                         if (sizeNow > 0) {
463                                 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
464                                 kms.clear();
465                                 batch.clear();
466                                 metricsSet.publishTick(sizeNow);
467                                 count += sizeNow;
468                         }
469
470                         final long endMs = System.currentTimeMillis();
471                         final long totalMs = endMs - startMs;
472
473                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
474
475                         // build a responseP
476                         final JSONObject response = new JSONObject();
477                         response.put("count", count);
478                         response.put("serverTimeMs", totalMs);
479                         DMaaPResponseBuilder.respondOk(ctx, response);
480
481                 } catch (Exception excp) {
482                         int status = HttpStatus.SC_NOT_FOUND;
483                         String errorMsg=null;
484                         if(excp instanceof CambriaApiException) {
485                                  status = ((CambriaApiException) excp).getStatus();
486                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
487                                  JSONObject errObject = new JSONObject(jsonTokener);
488                                  errorMsg = (String) errObject.get("message");
489                                         
490                         }
491                         ErrorResponse errRes = new ErrorResponse(status, 
492                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
493                                         errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
494                                         null,ctx.getRequest().getRemoteHost(),
495                                         null,null);
496                         LOG.info(errRes.toString());
497                         throw new CambriaApiException(errRes);
498                         
499                         
500                 }
501         }
502
503         /**
504          * 
505          * @param ctx
506          * @param inputStream
507          * @param topic
508          * @param partitionKey
509          * @param requestTime
510          * @param chunked
511          * @param mediaType
512          * @throws ConfigDbException
513          * @throws AccessDeniedException
514          * @throws TopicExistsException
515          * @throws IOException
516          * @throws CambriaApiException
517          */
518         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
519                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
520                                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
521                                         CambriaApiException {
522
523                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
524
525                 // setup the event set
526                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
527
528                 // start processing, building a batch to push to the backend
529                 final long startMs = System.currentTimeMillis();
530                 long count = 0;
531                 long maxEventBatch =  1024 * 16;
532                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
533                         if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen);
534                 //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
535                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
536                 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
537
538                 Publisher.message m = null;
539                 int messageSequence = 1;
540                 Long batchId = 1L;
541                 final boolean transactionEnabled = true;
542                 int publishBatchCount=0;
543                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
544
545                 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
546                 try {
547                         // for each message...
548                         batchId=DMaaPContext.getBatchID();
549                         
550                         String responseTransactionId = null;
551                         
552                         while ((m = events.next()) != null) {
553                         
554                                 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
555                                 
556
557                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
558                                                 transactionEnabled);
559                                 messageSequence++;
560
561                                 // add the message to the batch
562                                 batch.add(m);
563                                 
564                                 responseTransactionId = m.getLogDetails().getTransactionId();
565                                 
566                                 JSONObject jsonObject = new JSONObject();
567                                 jsonObject.put("message", m.getMessage());
568                                 jsonObject.put("transactionId", responseTransactionId);
569                                 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
570                                                 jsonObject.toString());
571                                 kms.add(data);
572
573                                 // check if the batch is full
574                                 final int sizeNow = batch.size();
575                                 if (sizeNow >= maxEventBatch) {
576                                         String startTime = sdf.format(new Date());
577                                         LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
578                                         try {
579                                                 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
580                                                 //transactionLogs(batch);
581                                                 for (message msg : batch) {
582                                                         LogDetails logDetails = msg.getLogDetails();
583                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
584                                                 }
585                                         } catch (Exception excp) {
586                                                 
587                                                 int status = HttpStatus.SC_NOT_FOUND;
588                                                 String errorMsg=null;
589                                                 if(excp instanceof CambriaApiException) {
590                                                          status = ((CambriaApiException) excp).getStatus();
591                                                          JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
592                                                          JSONObject errObject = new JSONObject(jsonTokener);
593                                                          errorMsg = (String) errObject.get("message");
594                                                 }
595                                                 ErrorResponse errRes = new ErrorResponse(status, 
596                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
597                                                                 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,
598                                                                 null,Utils.getFormattedDate(new Date()),topic,
599                                                                 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
600                                                                 null,null);
601                                                 LOG.info(errRes.toString());
602                                                 throw new CambriaApiException(errRes);
603                                         }
604                                         kms.clear();
605                                         batch.clear();
606                                         metricsSet.publishTick(sizeNow);
607                                         publishBatchCount=sizeNow;
608                                         count += sizeNow;
609                                         //batchId++;
610                                         String endTime = sdf.format(new Date());
611                                         LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
612                                                         + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
613                                         batchId=DMaaPContext.getBatchID();
614                                 }
615                         }
616
617                         // send the pending batch
618                         final int sizeNow = batch.size();
619                         if (sizeNow > 0) {
620                                 String startTime = sdf.format(new Date());
621                                 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
622                                 try {
623                                         ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
624                                         //transactionLogs(batch);
625                                         for (message msg : batch) {
626                                                 LogDetails logDetails = msg.getLogDetails();
627                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
628                                         }
629                                 } catch (Exception excp) {
630                                         int status = HttpStatus.SC_NOT_FOUND;
631                                         String errorMsg=null;
632                                         if(excp instanceof CambriaApiException) {
633                                                  status = ((CambriaApiException) excp).getStatus();
634                                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
635                                                  JSONObject errObject = new JSONObject(jsonTokener);
636                                                  errorMsg = (String) errObject.get("message");
637                                         }
638                                         
639                                         ErrorResponse errRes = new ErrorResponse(status, 
640                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
641                                                         "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg,
642                                                         null,Utils.getFormattedDate(new Date()),topic,
643                                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
644                                                         null,null);
645                                         LOG.info(errRes.toString());
646                                         throw new CambriaApiException(errRes);
647                                 }
648                                 kms.clear();
649                                 metricsSet.publishTick(sizeNow);
650                                 count += sizeNow;
651                                 //batchId++;
652                                 String endTime = sdf.format(new Date());
653                                 publishBatchCount=sizeNow;
654                                 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
655                                                 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
656                         }
657
658                         final long endMs = System.currentTimeMillis();
659                         final long totalMs = endMs - startMs;
660
661                         LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
662
663                         if (null != responseTransactionId) {
664                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
665                         }
666                         
667                         // build a response
668                         final JSONObject response = new JSONObject();
669                         response.put("count", count);
670                         response.put("serverTimeMs", totalMs);
671                         DMaaPResponseBuilder.respondOk(ctx, response);
672                         
673                 } catch (Exception excp) {
674                         int status = HttpStatus.SC_NOT_FOUND;
675                         String errorMsg=null;
676                         if(excp instanceof CambriaApiException) {
677                                  status = ((CambriaApiException) excp).getStatus();
678                                  JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
679                                  JSONObject errObject = new JSONObject(jsonTokener);
680                                  errorMsg = (String) errObject.get("message");
681                         }
682                         
683                         ErrorResponse errRes = new ErrorResponse(
684                                         status, 
685                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), 
686                                         "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
687                                         Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
688                                         null,null);
689                         LOG.info(errRes.toString());
690                         throw new CambriaApiException(errRes);
691                 }
692         }
693
694         /**
695          * 
696          * @param msg
697          * @param topic
698          * @param request
699          * @param messageCreationTime
700          * @param messageSequence
701          * @param batchId
702          * @param transactionEnabled
703          */
704         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
705                         final String messageCreationTime, final int messageSequence, final Long batchId,
706                         final boolean transactionEnabled) {
707                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
708                                 transactionEnabled);
709                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
710                 msg.setTransactionEnabled(transactionEnabled);
711                 msg.setLogDetails(logDetails);
712         }
713
714
715
716         /**
717          * 
718          * @author author
719          *
720          */
721         private static class LogWrap {
722                 private final String fId;
723
724                 /**
725                  * constructor initialization
726                  * 
727                  * @param topic
728                  * @param cgroup
729                  * @param cid
730                  */
731                 public LogWrap(String topic, String cgroup, String cid) {
732                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
733                 }
734
735                 /**
736                  * 
737                  * @param msg
738                  */
739                 public void info(String msg) {
740                         LOG.info(fId + msg);
741                 }
742
743                 /**
744                  * 
745                  * @param msg
746                  * @param t
747                  */
748                 public void warn(String msg, Exception t) {
749                         LOG.warn(fId + msg, t);
750                 }
751
752         }
753         
754         private boolean isTransEnabled() {
755                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
756                 boolean istransidreqd=false;
757                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){
758                         istransidreqd = true; 
759                 }
760                 
761                 return istransidreqd;
762
763         }
764
765         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
766                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
767                 LogDetails logDetails = new LogDetails();
768                 logDetails.setTopicId(topicName);
769                 logDetails.setMessageTimestamp(messageTimestamp);
770                 logDetails.setPublisherId(Utils.getUserApiKey(request));
771                 logDetails.setPublisherIp(request.getRemoteHost());
772                 logDetails.setMessageBatchId(batchId);
773                 logDetails.setMessageSequence(String.valueOf(messageSequence));
774                 logDetails.setTransactionEnabled(transactionEnabled);
775                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
776                 logDetails.setServerIp(request.getLocalAddr());
777                 return logDetails;
778         }
779
780         /*public String getMetricsTopic() {
781                 return metricsTopic;
782         }
783
784         public void setMetricsTopic(String metricsTopic) {
785                 this.metricsTopic = metricsTopic;
786         }*/
787
788 }