2f436eca764ce16b64cc3f4af2f8993ff61558ad
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
35
36 import javax.servlet.http.HttpServletRequest;
37 import javax.ws.rs.core.MediaType;
38
39 import org.apache.http.HttpStatus;
40 import org.apache.kafka.clients.consumer.ConsumerRecord;
41 import org.apache.kafka.clients.consumer.ConsumerRecords;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.apache.kafka.clients.producer.ProducerRecord;
44 import org.apache.kafka.common.errors.TopicExistsException;
45 import org.json.JSONObject;
46 import org.json.JSONTokener;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.beans.factory.annotation.Qualifier;
49 import org.springframework.stereotype.Service;
50
51 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
52 import org.onap.dmaap.dmf.mr.CambriaApiException;
53 import org.onap.dmaap.dmf.mr.backends.Consumer;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
55 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
56 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
57 import org.onap.dmaap.dmf.mr.backends.Publisher;
58 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
59 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
60 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
61 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
62 import org.onap.dmaap.dmf.mr.beans.LogDetails;
63 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
65 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
66 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
67 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
68
69 import org.onap.dmaap.dmf.mr.metabroker.Topic;
70 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
71 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
75 import org.onap.dmaap.dmf.mr.service.EventsService;
76 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
77 import org.onap.dmaap.dmf.mr.utils.Utils;
78 import com.att.eelf.configuration.EELFLogger;
79 import com.att.eelf.configuration.EELFManager;
80 import com.att.nsa.configs.ConfigDbException;
81 import com.att.nsa.drumlin.service.standards.MimeTypes;
82 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
83 import com.att.nsa.security.NsaApiKey;
84 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
85 import com.att.nsa.util.rrConvertor;
86
87 /**
88  * This class provides the functinality to publish and subscribe message to
89  * kafka
90  * 
91  * @author Ramkumar Sembaiyam
92  *
93  */
94 @Service
95 public class EventsServiceImpl implements EventsService {
96         // private static final Logger LOG =
97         
98         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
99
100         private static final String BATCH_LENGTH = "event.batch.length";
101         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
102         @Autowired
103         private DMaaPErrorMessages errorMessages;
104         
105         //@Autowired
106         
107
108         // @Value("${metrics.send.cambria.topic}")
109         
110
111         public DMaaPErrorMessages getErrorMessages() {
112                 return errorMessages;
113         }
114
115         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
116                 this.errorMessages = errorMessages;
117         }
118
119         /**
120          * @param ctx
121          * @param topic
122          * @param consumerGroup
123          * @param clientId
124          * @throws ConfigDbException,
125          *             TopicExistsException, AccessDeniedException,
126          *             UnavailableException, CambriaApiException, IOException
127          * 
128          * 
129          */
130         @Override
131         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
132                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
133                         CambriaApiException, IOException, DMaaPAccessDeniedException {
134                 final long startTime = System.currentTimeMillis();
135                 final HttpServletRequest req = ctx.getRequest();
136         
137                 boolean isAAFTopic = false;
138                 // was this host blacklisted?
139                 final String remoteAddr = Utils.getRemoteAddress(ctx);
140                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
141
142                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
143                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
144                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
145                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
146                                         ctx.getRequest().getRemoteHost(), null, null);
147                         LOG.info(errRes.toString());
148                         throw new CambriaApiException(errRes);
149                 }
150
151                 int limit = CambriaConstants.kNoLimit;
152                 if (req.getParameter("limit") != null) {
153                         limit = Integer.parseInt(req.getParameter("limit"));
154                 }
155
156                 int timeoutMs = CambriaConstants.kNoTimeout;
157                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
158                 if (strtimeoutMS != null)
159                         timeoutMs = Integer.parseInt(strtimeoutMS);
160                 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
161                 
162                 if (req.getParameter("timeout") != null) {
163                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
164                 }
165
166                 // By default no filter is applied if filter is not passed as a
167                 // parameter in the request URI
168                 String topicFilter = CambriaConstants.kNoFilter;
169                 if (null != req.getParameter("filter")) {
170                         topicFilter = req.getParameter("filter");
171                 }
172                 // pretty to print the messaages in new line
173                 String prettyval = "0";
174                 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
175                 if (null != strPretty)
176                         prettyval = strPretty;
177
178                 String metaval = "0";
179                 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
180                 if (null != strmeta)
181                         metaval = strmeta;
182
183                 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
184                 // withMeta to print offset along with message
185                 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
186
187                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
188                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
189
190                 // is this user allowed to read this topic?
191                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
192                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
193
194                 if (metatopic == null) {
195                         // no such topic.
196                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
197                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
198                                         errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
199                                         topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
200                         LOG.info(errRes.toString());
201                         throw new CambriaApiException(errRes);
202                 }
203                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
204                                 "metrics.send.cambria.topic");
205                 if (null == metricTopicname)
206                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
207
208                 if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
209                         if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
210                                 // check permissions
211                                 metatopic.checkUserRead(user);
212                         }
213                 }
214                 // if headers are not provided then user will be null
215                 if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
216                         // the topic name will be sent by the client
217                         
218                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
219                         String permission = aaf.aafPermissionString(topic, "sub");
220                         if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
221                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
222                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
223                                                 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
224                                                                 + permission,
225                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
226                                                 ctx.getRequest().getRemoteHost());
227                                 LOG.info(errRes.toString());
228                                 throw new DMaaPAccessDeniedException(errRes);
229
230                         }
231                         isAAFTopic = true;
232                 }
233                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
234                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
235                                 + " " + clientId);
236                 Consumer c = null;
237                 
238                 String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
239                                 "clusterhostid");
240                 if (null == lhostId) {
241                         try {
242                                 lhostId = InetAddress.getLocalHost().getCanonicalHostName();
243                         } catch (UnknownHostException e) {
244                                 LOG.info("Unknown Host Exception error occured while getting getting hostid");
245                         }
246
247                 }
248                  CambriaOutboundEventStream coes = null;
249                 try {
250                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
251                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
252                         rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
253                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
254                                         ctx.getRequest().getRemoteHost());
255                         coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
256                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
257                         coes.setDmaapContext(ctx);
258                         coes.setTopic(metatopic);
259                         if (isTransEnabled() || isAAFTopic) {
260                                 coes.setTransEnabled(true);
261                         } else {
262                                 coes.setTransEnabled(false);
263                         }
264                         coes.setTopicStyle(isAAFTopic);
265                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
266                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
267                                         + consumerGroup + " " + clientId);
268
269                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
270                 
271                         DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
272                         // No IOException thrown during respondOkWithStream, so commit the
273                         // new offsets to all the brokers
274                         c.commitOffsets();
275                         final int sent = coes.getSentCount();
276
277                          metricsSet.consumeTick(sent);
278                      rl.onSend(topic, consumerGroup, clientId, sent);
279                         final long elapsedMs = System.currentTimeMillis() - startTime;
280                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
281                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
282                                         + ctx.getRequest().getRemoteHost());
283
284                 } catch (UnavailableException excp) {
285                         logger.warn(excp.getMessage(), excp);
286
287                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
288                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
289                                         errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
290                                         null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
291                         LOG.info(errRes.toString());
292                         throw new CambriaApiException(errRes);
293
294                 }  catch (java.util.ConcurrentModificationException excp1) {
295                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
296                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
297                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
298                                         "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
299                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
300                         logger.info(errRes.toString());
301                         throw new CambriaApiException(errRes);
302                         
303                 } catch (CambriaApiException excp) {
304                         LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
305                         
306                         throw excp;
307                 }
308                 catch (Exception excp) {
309                         // System.out.println(excp + "------------------ " + topic+"
310                         // "+consumerGroup+" "+clientId);
311
312                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
313                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
314                                         
315                                 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
316
317                         
318                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
319                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
320                                         "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
321                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
322                         logger.info(errRes.toString());
323                         throw new CambriaApiException(errRes);
324                 } finally {
325                         coes = null;
326                         // If no cache, close the consumer now that we're done with it.
327                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
328                         String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
329                                         ConsumerFactory.kSetting_EnableCache);
330                         if (null != strkSetting_EnableCache)
331                                 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
332                         // if
333                         // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
334                         // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
335                         if (!kSetting_EnableCache && (c != null)) {
336                                 try {
337                                         c.close();
338                                 } catch (Exception e) {
339                                         logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
340                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
341                                                         + " " + e);
342                                 }
343                         }
344                 }
345         }
346
347         /**
348          * @throws missingReqdSetting
349          * 
350          */
351         @Override
352         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
353                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
354                         CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
355
356                 // is this user allowed to write to this topic?
357                 final long startMs = System.currentTimeMillis();
358                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
359                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
360                 boolean isAAFTopic = false;
361
362                 // was this host blacklisted?
363                 final String remoteAddr = Utils.getRemoteAddress(ctx);
364
365                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
366
367                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
368                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
369                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
370                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
371                                         ctx.getRequest().getRemoteHost(), null, null);
372                         LOG.info(errRes.toString());
373                         throw new CambriaApiException(errRes);
374                 }
375
376                 String topicNameStd = null;
377
378                 // topicNameStd=
379                 
380                 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
381                                 "enforced.topic.name.AAF");
382                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
383                                 "metrics.send.cambria.topic");
384                 if (null == metricTopicname)
385                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
386                 boolean topicNameEnforced = false;
387                 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
388                         topicNameEnforced = true;
389                 }
390
391                 // Here check if the user has rights to publish on the topic
392                 // ( This will be called when no auth is added or when UEB API Key
393                 // Authentication is used)
394                 // checkUserWrite(user) method will throw an error when there is no Auth
395                 // header added or when the
396                 // user has no publish rights
397
398                 if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
399                                 && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
400                         metatopic.checkUserWrite(user);
401                 }
402
403                 // if headers are not provided then user will be null
404                 if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
405                                 && !topic.equalsIgnoreCase(metricTopicname))) {
406                         // the topic name will be sent by the client
407                 
408                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
409                         String permission = aaf.aafPermissionString(topic, "pub");
410                         if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
411                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
412                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
413                                                 errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
414                                                                 + " on " + permission,
415                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
416                                                 ctx.getRequest().getRemoteHost(), null, null);
417                                 LOG.info(errRes.toString());
418                                 throw new DMaaPAccessDeniedException(errRes);
419                         }
420                         isAAFTopic = true;
421                 }
422
423                 final HttpServletRequest req = ctx.getRequest();
424
425                 // check for chunked input
426                 boolean chunked = false;
427                 if (null != req.getHeader(TRANSFER_ENCODING)) {
428                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
429                 }
430                 // get the media type, or set it to a generic value if it wasn't
431                 // provided
432                 String mediaType = req.getContentType();
433                 if (mediaType == null || mediaType.length() == 0) {
434                         mediaType = MimeTypes.kAppGenericBinary;
435                 }
436
437                 if (mediaType.contains("charset=UTF-8")) {
438                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
439                 }
440
441                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
442                                 "transidUEBtopicreqd");
443                 boolean istransidreqd = false;
444                 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
445                         istransidreqd = true;
446                 }
447
448                 if (isAAFTopic || istransidreqd) {
449                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
450                 } else {
451                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
452                 }
453                 final long endMs = System.currentTimeMillis();
454                 final long totalMs = endMs - startMs;
455
456                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
457
458         }
459
460         /**
461          * 
462          * @param ctx
463          * @param topic
464          * @param msg
465          * @param defaultPartition
466          * @param chunked
467          * @param mediaType
468          * @throws ConfigDbException
469          * @throws AccessDeniedException
470          * @throws TopicExistsException
471          * @throws CambriaApiException
472          * @throws IOException
473          */
474         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
475                         String mediaType)
476                         throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
477                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
478                 // setup the event set
479                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
480
481                 // start processing, building a batch to push to the backend
482                 final long startMs = System.currentTimeMillis();
483                 long count = 0;
484                 long maxEventBatch = 1024L* 16;
485                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
486                 if (null != batchlen)
487                         maxEventBatch = Long.parseLong(batchlen);
488                 // long maxEventBatch =
489                 
490                 final LinkedList<Publisher.message> batch = new LinkedList<>();
491                 // final ArrayList<KeyedMessage<String, String>> kms = new
492         
493                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
494                 try {
495                         // for each message...
496                         Publisher.message m = null;
497                         while ((m = events.next()) != null) {
498                                 // add the message to the batch
499                                 batch.add(m);
500                                 // final KeyedMessage<String, String> data = new
501                                 // KeyedMessage<String, String>(topic, m.getKey(),
502                         
503                                 // kms.add(data);
504                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
505                                                 m.getMessage());
506
507                                 pms.add(data);
508                                 // check if the batch is full
509                                 final int sizeNow = batch.size();
510                                 if (sizeNow > maxEventBatch) {
511                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
512                                 
513                                         // kms.clear();
514                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
515                                         pms.clear();
516                                         batch.clear();
517                                         metricsSet.publishTick(sizeNow);
518                                         count += sizeNow;
519                                 }
520                         }
521
522                         // send the pending batch
523                         final int sizeNow = batch.size();
524                         if (sizeNow > 0) {
525                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
526                         
527                                 // kms.clear();
528                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
529                                 pms.clear();
530                                 batch.clear();
531                                 metricsSet.publishTick(sizeNow);
532                                 count += sizeNow;
533                         }
534
535                         final long endMs = System.currentTimeMillis();
536                         final long totalMs = endMs - startMs;
537
538                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
539                                         + ctx.getRequest().getRemoteHost());
540
541                         // build a responseP
542                         final JSONObject response = new JSONObject();
543                         response.put("count", count);
544                         response.put("serverTimeMs", totalMs);
545                         DMaaPResponseBuilder.respondOk(ctx, response);
546
547                 } catch (Exception excp) {
548                         int status = HttpStatus.SC_NOT_FOUND;
549                         String errorMsg = null;
550                         if (excp instanceof CambriaApiException) {
551                                 status = ((CambriaApiException) excp).getStatus();
552                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
553                                 JSONObject errObject = new JSONObject(jsonTokener);
554                                 errorMsg = (String) errObject.get("message");
555
556                         }
557                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
558                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
559                                                         + "." + errorMsg,
560                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
561                                         null);
562                         LOG.info(errRes.toString());
563                         throw new CambriaApiException(errRes);
564
565                 }
566         }
567
568         /**
569          * 
570          * @param ctx
571          * @param inputStream
572          * @param topic
573          * @param partitionKey
574          * @param requestTime
575          * @param chunked
576          * @param mediaType
577          * @throws ConfigDbException
578          * @throws AccessDeniedException
579          * @throws TopicExistsException
580          * @throws IOException
581          * @throws CambriaApiException
582          */
583         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
584                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
585                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
586
587                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
588
589                 // setup the event set
590                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
591
592                 // start processing, building a batch to push to the backend
593                 final long startMs = System.currentTimeMillis();
594                 long count = 0;
595                 long maxEventBatch = 1024L * 16;
596                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
597                 if (null != evenlen)
598                         maxEventBatch = Long.parseLong(evenlen);
599                 // final long maxEventBatch =
600                 
601                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
602                 // final ArrayList<KeyedMessage<String, String>> kms = new
603                 
604                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
605                 Publisher.message m = null;
606                 int messageSequence = 1;
607                 Long batchId = 1L;
608                 final boolean transactionEnabled = true;
609                 int publishBatchCount = 0;
610                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
611
612                 // LOG.warn("Batch Start Id: " +
613         
614                 try {
615                         // for each message...
616                         batchId = DMaaPContext.getBatchID();
617
618                         String responseTransactionId = null;
619
620                         while ((m = events.next()) != null) {
621
622                                 // LOG.warn("Batch Start Id: " +
623                                 
624
625                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
626                                                 transactionEnabled);
627                                 messageSequence++;
628
629                         
630                                 batch.add(m);
631
632                                 responseTransactionId = m.getLogDetails().getTransactionId();
633
634                                 JSONObject jsonObject = new JSONObject();
635                                 jsonObject.put("msgWrapMR", m.getMessage());
636                                 jsonObject.put("transactionId", responseTransactionId);
637                                 // final KeyedMessage<String, String> data = new
638                                 // KeyedMessage<String, String>(topic, m.getKey(),
639                         
640                                 
641                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
642                                                 m.getMessage());
643
644                                 pms.add(data);
645                                 // check if the batch is full
646                                 final int sizeNow = batch.size();
647                                 if (sizeNow >= maxEventBatch) {
648                                         String startTime = sdf.format(new Date());
649                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
650                                                         + batchId + "]");
651                                         try {
652                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
653                                         
654                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
655                                                 // transactionLogs(batch);
656                                                 for (message msg : batch) {
657                                                         LogDetails logDetails = msg.getLogDetails();
658                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
659                                                 }
660                                         } catch (Exception excp) {
661
662                                                 int status = HttpStatus.SC_NOT_FOUND;
663                                                 String errorMsg = null;
664                                                 if (excp instanceof CambriaApiException) {
665                                                         status = ((CambriaApiException) excp).getStatus();
666                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
667                                                         JSONObject errObject = new JSONObject(jsonTokener);
668                                                         errorMsg = (String) errObject.get("message");
669                                                 }
670                                                 ErrorResponse errRes = new ErrorResponse(status,
671                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
672                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
673                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
674                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
675                                                                 ctx.getRequest().getRemoteHost(), null, null);
676                                                 LOG.info(errRes.toString());
677                                                 throw new CambriaApiException(errRes);
678                                         }
679                                         pms.clear();
680                                         batch.clear();
681                                         metricsSet.publishTick(sizeNow);
682                                         publishBatchCount = sizeNow;
683                                         count += sizeNow;
684                                         
685                                         String endTime = sdf.format(new Date());
686                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
687                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
688                                                         + ",Batch End Time=" + endTime + "]");
689                                         batchId = DMaaPContext.getBatchID();
690                                 }
691                         }
692
693                         // send the pending batch
694                         final int sizeNow = batch.size();
695                         if (sizeNow > 0) {
696                                 String startTime = sdf.format(new Date());
697                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
698                                                 + batchId + "]");
699                                 try {
700                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
701                                         
702                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
703                                         
704                                         for (message msg : batch) {
705                                                 LogDetails logDetails = msg.getLogDetails();
706                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
707                                         }
708                                 } catch (Exception excp) {
709                                         int status = HttpStatus.SC_NOT_FOUND;
710                                         String errorMsg = null;
711                                         if (excp instanceof CambriaApiException) {
712                                                 status = ((CambriaApiException) excp).getStatus();
713                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
714                                                 JSONObject errObject = new JSONObject(jsonTokener);
715                                                 errorMsg = (String) errObject.get("message");
716                                         }
717
718                                         ErrorResponse errRes = new ErrorResponse(status,
719                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
720                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
721                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
722                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
723                                                         ctx.getRequest().getRemoteHost(), null, null);
724                                         LOG.info(errRes.toString());
725                                         throw new CambriaApiException(errRes);
726                                 }
727                                 pms.clear();
728                                 metricsSet.publishTick(sizeNow);
729                                 count += sizeNow;
730                         
731                                 String endTime = sdf.format(new Date());
732                                 publishBatchCount = sizeNow;
733                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
734                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
735                                                 + endTime + "]");
736                         }
737
738                         final long endMs = System.currentTimeMillis();
739                         final long totalMs = endMs - startMs;
740
741                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
742
743                         if (null != responseTransactionId) {
744                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
745                         }
746
747                         // build a response
748                         final JSONObject response = new JSONObject();
749                         response.put("count", count);
750                         response.put("serverTimeMs", totalMs);
751                         DMaaPResponseBuilder.respondOk(ctx, response);
752
753                 } catch (Exception excp) {
754                         int status = HttpStatus.SC_NOT_FOUND;
755                         String errorMsg = null;
756                         if (excp instanceof CambriaApiException) {
757                                 status = ((CambriaApiException) excp).getStatus();
758                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
759                                 JSONObject errObject = new JSONObject(jsonTokener);
760                                 errorMsg = (String) errObject.get("message");
761                         }
762
763                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
764                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
765                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
766                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
767                                         ctx.getRequest().getRemoteHost(), null, null);
768                         LOG.info(errRes.toString());
769                         throw new CambriaApiException(errRes);
770                 }
771         }
772
773         /**
774          * 
775          * @param msg
776          * @param topic
777          * @param request
778          * @param messageCreationTime
779          * @param messageSequence
780          * @param batchId
781          * @param transactionEnabled
782          */
783         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
784                         final String messageCreationTime, final int messageSequence, final Long batchId,
785                         final boolean transactionEnabled) {
786                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
787                                 transactionEnabled);
788                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
789                 msg.setTransactionEnabled(transactionEnabled);
790                 msg.setLogDetails(logDetails);
791         }
792
793         /**
794          * 
795          * @author anowarul.islam
796          *
797          */
798         private static class LogWrap {
799                 private final String fId;
800
801                 /**
802                  * constructor initialization
803                  * 
804                  * @param topic
805                  * @param cgroup
806                  * @param cid
807                  */
808                 public LogWrap(String topic, String cgroup, String cid) {
809                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
810                 }
811
812                 /**
813                  * 
814                  * @param msg
815                  */
816                 public void info(String msg) {
817                         LOG.info(fId + msg);
818                 }
819
820                 /**
821                  * 
822                  * @param msg
823                  * @param t
824                  */
825                 public void warn(String msg, Exception t) {
826                         LOG.warn(fId + msg, t);
827                 }
828
829         }
830
831         public boolean isTransEnabled() {
832                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
833                                 "transidUEBtopicreqd");
834                 boolean istransidreqd = false;
835                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
836                         istransidreqd = true;
837                 }
838
839                 return istransidreqd;
840
841         }
842
843         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
844                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
845                 LogDetails logDetails = new LogDetails();
846                 logDetails.setTopicId(topicName);
847                 logDetails.setMessageTimestamp(messageTimestamp);
848                 logDetails.setPublisherId(Utils.getUserApiKey(request));
849                 logDetails.setPublisherIp(request.getRemoteHost());
850                 logDetails.setMessageBatchId(batchId);
851                 logDetails.setMessageSequence(String.valueOf(messageSequence));
852                 logDetails.setTransactionEnabled(transactionEnabled);
853                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
854                 logDetails.setServerIp(request.getLocalAddr());
855                 return logDetails;
856         }
857
858         
859          
860         
861          
862         
863          
864         
865
866
867 }