revert few sonar fixes
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
35
36 import javax.servlet.http.HttpServletRequest;
37 import javax.ws.rs.core.MediaType;
38
39 import org.apache.http.HttpStatus;
40 import org.apache.kafka.clients.consumer.ConsumerRecord;
41 import org.apache.kafka.clients.consumer.ConsumerRecords;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.apache.kafka.clients.producer.ProducerRecord;
44 import org.apache.kafka.common.errors.TopicExistsException;
45 import org.json.JSONObject;
46 import org.json.JSONTokener;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.beans.factory.annotation.Qualifier;
49 import org.springframework.stereotype.Service;
50
51 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
52 import org.onap.dmaap.dmf.mr.CambriaApiException;
53 import org.onap.dmaap.dmf.mr.backends.Consumer;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
55 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
56 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
57 import org.onap.dmaap.dmf.mr.backends.Publisher;
58 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
59 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
60 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
61 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
62 import org.onap.dmaap.dmf.mr.beans.LogDetails;
63 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
65 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
66 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
67 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
68
69 import org.onap.dmaap.dmf.mr.metabroker.Topic;
70 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
71 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
75 import org.onap.dmaap.dmf.mr.service.EventsService;
76 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
77 import org.onap.dmaap.dmf.mr.utils.Utils;
78 import com.att.eelf.configuration.EELFLogger;
79 import com.att.eelf.configuration.EELFManager;
80 import com.att.nsa.configs.ConfigDbException;
81 import com.att.nsa.drumlin.service.standards.MimeTypes;
82 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
83 import com.att.nsa.security.NsaApiKey;
84 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
85 import com.att.nsa.util.rrConvertor;
86
87 /**
88  * This class provides the functinality to publish and subscribe message to
89  * kafka
90  * 
91  * @author Ramkumar Sembaiyam
92  *
93  */
94 @Service
95 public class EventsServiceImpl implements EventsService {
96         // private static final Logger LOG =
97         
98         private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
99
100         private static final String BATCH_LENGTH = "event.batch.length";
101         private static final String TRANSFER_ENCODING = "Transfer-Encoding";
102         @Autowired
103         private DMaaPErrorMessages errorMessages;
104         
105         //@Autowired
106         
107
108         // @Value("${metrics.send.cambria.topic}")
109         
110
111         public DMaaPErrorMessages getErrorMessages() {
112                 return errorMessages;
113         }
114
115         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
116                 this.errorMessages = errorMessages;
117         }
118
119         /**
120          * @param ctx
121          * @param topic
122          * @param consumerGroup
123          * @param clientId
124          * @throws ConfigDbException,
125          *             TopicExistsException, AccessDeniedException,
126          *             UnavailableException, CambriaApiException, IOException
127          * 
128          * 
129          */
130         @Override
131         public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
132                         throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
133                         CambriaApiException, IOException, DMaaPAccessDeniedException {
134                 final long startTime = System.currentTimeMillis();
135                 final HttpServletRequest req = ctx.getRequest();
136         
137                 boolean isAAFTopic = false;
138                 // was this host blacklisted?
139                 final String remoteAddr = Utils.getRemoteAddress(ctx);
140                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
141
142                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
143                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
144                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
145                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
146                                         ctx.getRequest().getRemoteHost(), null, null);
147                         LOG.info(errRes.toString());
148                         throw new CambriaApiException(errRes);
149                 }
150
151                 int limit = CambriaConstants.kNoLimit;
152                 if (req.getParameter("limit") != null) {
153                         limit = Integer.parseInt(req.getParameter("limit"));
154                 }
155
156                 int timeoutMs = CambriaConstants.kNoTimeout;
157                 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
158                 if (strtimeoutMS != null)
159                         timeoutMs = Integer.parseInt(strtimeoutMS);
160                 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
161                 
162                 if (req.getParameter("timeout") != null) {
163                         timeoutMs = Integer.parseInt(req.getParameter("timeout"));
164                 }
165
166                 // By default no filter is applied if filter is not passed as a
167                 // parameter in the request URI
168                 String topicFilter = CambriaConstants.kNoFilter;
169                 if (null != req.getParameter("filter")) {
170                         topicFilter = req.getParameter("filter");
171                 }
172                 // pretty to print the messaages in new line
173                 String prettyval = "0";
174                 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
175                 if (null != strPretty)
176                         prettyval = strPretty;
177
178                 String metaval = "0";
179                 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
180                 if (null != strmeta)
181                         metaval = strmeta;
182
183                 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
184                 // withMeta to print offset along with message
185                 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
186
187                 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
188                 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
189
190                 // is this user allowed to read this topic?
191                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
192                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
193
194                 if (metatopic == null) {
195                         // no such topic.
196                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
197                                         DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
198                                         errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
199                                         topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
200                         LOG.info(errRes.toString());
201                         throw new CambriaApiException(errRes);
202                 }
203                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
204                                 "metrics.send.cambria.topic");
205                 if (null == metricTopicname)
206                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
207                 
208                 boolean topicNameEnforced = false;
209                 String topicNameStd = null;
210                 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
211                                 "enforced.topic.name.AAF");
212                 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
213                         topicNameEnforced = true;
214                 }
215
216                 if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
217                         if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
218                                 // check permissions
219                                 metatopic.checkUserRead(user);
220                         }
221                 }
222                 // if headers are not provided then user will be null
223                 if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) {
224                         // the topic name will be sent by the client
225                         
226                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
227                         String permission = aaf.aafPermissionString(topic, "sub");
228                         if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
229                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
230                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
231                                                 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
232                                                                 + permission,
233                                                 null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
234                                                 ctx.getRequest().getRemoteHost());
235                                 LOG.info(errRes.toString());
236                                 throw new DMaaPAccessDeniedException(errRes);
237
238                         }
239                         isAAFTopic = true;
240                 }
241                 final long elapsedMs1 = System.currentTimeMillis() - startTime;
242                 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
243                                 + " " + clientId);
244                 Consumer c = null;
245                 
246                 String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
247                                 "clusterhostid");
248                 if (null == lhostId) {
249                         try {
250                                 lhostId = InetAddress.getLocalHost().getCanonicalHostName();
251                         } catch (UnknownHostException e) {
252                                 LOG.info("Unknown Host Exception error occured while getting getting hostid");
253                         }
254
255                 }
256                  CambriaOutboundEventStream coes = null;
257                 try {
258                         final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
259                         final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
260                         rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
261                         c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
262                                         ctx.getRequest().getRemoteHost());
263                         coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
264                                         .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
265                         coes.setDmaapContext(ctx);
266                         coes.setTopic(metatopic);
267                         if (isTransEnabled() || isAAFTopic) {
268                                 coes.setTransEnabled(true);
269                         } else {
270                                 coes.setTransEnabled(false);
271                         }
272                         coes.setTopicStyle(isAAFTopic);
273                         final long elapsedMs2 = System.currentTimeMillis() - startTime;
274                         logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
275                                         + consumerGroup + " " + clientId);
276
277                         DMaaPResponseBuilder.setNoCacheHeadings(ctx);
278                 
279                         DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
280                         // No IOException thrown during respondOkWithStream, so commit the
281                         // new offsets to all the brokers
282                         c.commitOffsets();
283                         final int sent = coes.getSentCount();
284
285                          metricsSet.consumeTick(sent);
286                      rl.onSend(topic, consumerGroup, clientId, sent);
287                         final long elapsedMs = System.currentTimeMillis() - startTime;
288                         logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
289                                         + topic + " " + consumerGroup + " " + clientId + " on to the server "
290                                         + ctx.getRequest().getRemoteHost());
291
292                 } catch (UnavailableException excp) {
293                         logger.warn(excp.getMessage(), excp);
294
295                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
296                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
297                                         errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
298                                         null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
299                         LOG.info(errRes.toString());
300                         throw new CambriaApiException(errRes);
301
302                 }  catch (java.util.ConcurrentModificationException excp1) {
303                         LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
304                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
305                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
306                                         "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
307                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
308                         logger.info(errRes.toString());
309                         throw new CambriaApiException(errRes);
310                         
311                 } catch (CambriaApiException excp) {
312                         LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
313                         
314                         throw excp;
315                 }
316                 catch (Exception excp) {
317                         // System.out.println(excp + "------------------ " + topic+"
318                         // "+consumerGroup+" "+clientId);
319
320                         logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
321                                         + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
322                                         
323                                 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
324
325                         
326                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
327                                         DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
328                                         "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
329                                         Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
330                         logger.info(errRes.toString());
331                         throw new CambriaApiException(errRes);
332                 } finally {
333                         coes = null;
334                         // If no cache, close the consumer now that we're done with it.
335                         boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
336                         String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
337                                         ConsumerFactory.kSetting_EnableCache);
338                         if (null != strkSetting_EnableCache)
339                                 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
340                         // if
341                         // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
342                         // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
343                         if (!kSetting_EnableCache && (c != null)) {
344                                 try {
345                                         c.close();
346                                 } catch (Exception e) {
347                                         logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
348                                                         + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
349                                                         + " " + e);
350                                 }
351                         }
352                 }
353         }
354
355         /**
356          * @throws missingReqdSetting
357          * 
358          */
359         @Override
360         public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
361                         final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
362                         CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
363
364                 // is this user allowed to write to this topic?
365                 final long startMs = System.currentTimeMillis();
366                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
367                 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
368                 boolean isAAFTopic = false;
369
370                 // was this host blacklisted?
371                 final String remoteAddr = Utils.getRemoteAddress(ctx);
372
373                 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
374
375                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
376                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
377                                         "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
378                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
379                                         ctx.getRequest().getRemoteHost(), null, null);
380                         LOG.info(errRes.toString());
381                         throw new CambriaApiException(errRes);
382                 }
383
384                 String topicNameStd = null;
385
386                 // topicNameStd=
387                 
388                 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
389                                 "enforced.topic.name.AAF");
390                 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
391                                 "metrics.send.cambria.topic");
392                 if (null == metricTopicname)
393                         metricTopicname = "msgrtr.apinode.metrics.dmaap";
394                 boolean topicNameEnforced = false;
395                 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
396                         topicNameEnforced = true;
397                 }
398
399                 // Here check if the user has rights to publish on the topic
400                 // ( This will be called when no auth is added or when UEB API Key
401                 // Authentication is used)
402                 // checkUserWrite(user) method will throw an error when there is no Auth
403                 // header added or when the
404                 // user has no publish rights
405
406                 if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
407                                 && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
408                         metatopic.checkUserWrite(user);
409                 }
410
411                 // if headers are not provided then user will be null
412                 if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
413                                 && !topic.equalsIgnoreCase(metricTopicname))) {
414                         // the topic name will be sent by the client
415                 
416                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
417                         String permission = aaf.aafPermissionString(topic, "pub");
418                         if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
419                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
420                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
421                                                 errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
422                                                                 + " on " + permission,
423                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
424                                                 ctx.getRequest().getRemoteHost(), null, null);
425                                 LOG.info(errRes.toString());
426                                 throw new DMaaPAccessDeniedException(errRes);
427                         }
428                         isAAFTopic = true;
429                 }
430
431                 final HttpServletRequest req = ctx.getRequest();
432
433                 // check for chunked input
434                 boolean chunked = false;
435                 if (null != req.getHeader(TRANSFER_ENCODING)) {
436                         chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
437                 }
438                 // get the media type, or set it to a generic value if it wasn't
439                 // provided
440                 String mediaType = req.getContentType();
441                 if (mediaType == null || mediaType.length() == 0) {
442                         mediaType = MimeTypes.kAppGenericBinary;
443                 }
444
445                 if (mediaType.contains("charset=UTF-8")) {
446                         mediaType = mediaType.replace("; charset=UTF-8", "").trim();
447                 }
448
449                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
450                                 "transidUEBtopicreqd");
451                 boolean istransidreqd = false;
452                 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
453                         istransidreqd = true;
454                 }
455
456                 if (isAAFTopic || istransidreqd) {
457                         pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
458                 } else {
459                         pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
460                 }
461                 final long endMs = System.currentTimeMillis();
462                 final long totalMs = endMs - startMs;
463
464                 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
465
466         }
467
468         /**
469          * 
470          * @param ctx
471          * @param topic
472          * @param msg
473          * @param defaultPartition
474          * @param chunked
475          * @param mediaType
476          * @throws ConfigDbException
477          * @throws AccessDeniedException
478          * @throws TopicExistsException
479          * @throws CambriaApiException
480          * @throws IOException
481          */
482         private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
483                         String mediaType)
484                         throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
485                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
486                 // setup the event set
487                 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
488
489                 // start processing, building a batch to push to the backend
490                 final long startMs = System.currentTimeMillis();
491                 long count = 0;
492                 long maxEventBatch = 1024L* 16;
493                 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
494                 if (null != batchlen)
495                         maxEventBatch = Long.parseLong(batchlen);
496                 // long maxEventBatch =
497                 
498                 final LinkedList<Publisher.message> batch = new LinkedList<>();
499                 // final ArrayList<KeyedMessage<String, String>> kms = new
500         
501                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
502                 try {
503                         // for each message...
504                         Publisher.message m = null;
505                         while ((m = events.next()) != null) {
506                                 // add the message to the batch
507                                 batch.add(m);
508                                 // final KeyedMessage<String, String> data = new
509                                 // KeyedMessage<String, String>(topic, m.getKey(),
510                         
511                                 // kms.add(data);
512                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
513                                                 m.getMessage());
514
515                                 pms.add(data);
516                                 // check if the batch is full
517                                 final int sizeNow = batch.size();
518                                 if (sizeNow > maxEventBatch) {
519                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
520                                 
521                                         // kms.clear();
522                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
523                                         pms.clear();
524                                         batch.clear();
525                                         metricsSet.publishTick(sizeNow);
526                                         count += sizeNow;
527                                 }
528                         }
529
530                         // send the pending batch
531                         final int sizeNow = batch.size();
532                         if (sizeNow > 0) {
533                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
534                         
535                                 // kms.clear();
536                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
537                                 pms.clear();
538                                 batch.clear();
539                                 metricsSet.publishTick(sizeNow);
540                                 count += sizeNow;
541                         }
542
543                         final long endMs = System.currentTimeMillis();
544                         final long totalMs = endMs - startMs;
545
546                         LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
547                                         + ctx.getRequest().getRemoteHost());
548
549                         // build a responseP
550                         final JSONObject response = new JSONObject();
551                         response.put("count", count);
552                         response.put("serverTimeMs", totalMs);
553                         DMaaPResponseBuilder.respondOk(ctx, response);
554
555                 } catch (Exception excp) {
556                         int status = HttpStatus.SC_NOT_FOUND;
557                         String errorMsg = null;
558                         if (excp instanceof CambriaApiException) {
559                                 status = ((CambriaApiException) excp).getStatus();
560                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
561                                 JSONObject errObject = new JSONObject(jsonTokener);
562                                 errorMsg = (String) errObject.get("message");
563
564                         }
565                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
566                                         errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
567                                                         + "." + errorMsg,
568                                         null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
569                                         null);
570                         LOG.info(errRes.toString());
571                         throw new CambriaApiException(errRes);
572
573                 }
574         }
575
576         /**
577          * 
578          * @param ctx
579          * @param inputStream
580          * @param topic
581          * @param partitionKey
582          * @param requestTime
583          * @param chunked
584          * @param mediaType
585          * @throws ConfigDbException
586          * @throws AccessDeniedException
587          * @throws TopicExistsException
588          * @throws IOException
589          * @throws CambriaApiException
590          */
591         private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
592                         final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
593                         throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
594
595                 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
596
597                 // setup the event set
598                 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
599
600                 // start processing, building a batch to push to the backend
601                 final long startMs = System.currentTimeMillis();
602                 long count = 0;
603                 long maxEventBatch = 1024L * 16;
604                 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
605                 if (null != evenlen)
606                         maxEventBatch = Long.parseLong(evenlen);
607                 // final long maxEventBatch =
608                 
609                 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
610                 // final ArrayList<KeyedMessage<String, String>> kms = new
611                 
612                 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
613                 Publisher.message m = null;
614                 int messageSequence = 1;
615                 Long batchId = 1L;
616                 final boolean transactionEnabled = true;
617                 int publishBatchCount = 0;
618                 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
619
620                 // LOG.warn("Batch Start Id: " +
621         
622                 try {
623                         // for each message...
624                         batchId = DMaaPContext.getBatchID();
625
626                         String responseTransactionId = null;
627
628                         while ((m = events.next()) != null) {
629
630                                 // LOG.warn("Batch Start Id: " +
631                                 
632
633                                 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
634                                                 transactionEnabled);
635                                 messageSequence++;
636
637                         
638                                 batch.add(m);
639
640                                 responseTransactionId = m.getLogDetails().getTransactionId();
641
642                                 JSONObject jsonObject = new JSONObject();
643                                 jsonObject.put("msgWrapMR", m.getMessage());
644                                 jsonObject.put("transactionId", responseTransactionId);
645                                 // final KeyedMessage<String, String> data = new
646                                 // KeyedMessage<String, String>(topic, m.getKey(),
647                         
648                                 
649                                 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
650                                                 m.getMessage());
651
652                                 pms.add(data);
653                                 // check if the batch is full
654                                 final int sizeNow = batch.size();
655                                 if (sizeNow >= maxEventBatch) {
656                                         String startTime = sdf.format(new Date());
657                                         LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
658                                                         + batchId + "]");
659                                         try {
660                                                 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
661                                         
662                                                 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
663                                                 // transactionLogs(batch);
664                                                 for (message msg : batch) {
665                                                         LogDetails logDetails = msg.getLogDetails();
666                                                         LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
667                                                 }
668                                         } catch (Exception excp) {
669
670                                                 int status = HttpStatus.SC_NOT_FOUND;
671                                                 String errorMsg = null;
672                                                 if (excp instanceof CambriaApiException) {
673                                                         status = ((CambriaApiException) excp).getStatus();
674                                                         JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
675                                                         JSONObject errObject = new JSONObject(jsonTokener);
676                                                         errorMsg = (String) errObject.get("message");
677                                                 }
678                                                 ErrorResponse errRes = new ErrorResponse(status,
679                                                                 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
680                                                                 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
681                                                                                 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
682                                                                 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
683                                                                 ctx.getRequest().getRemoteHost(), null, null);
684                                                 LOG.info(errRes.toString());
685                                                 throw new CambriaApiException(errRes);
686                                         }
687                                         pms.clear();
688                                         batch.clear();
689                                         metricsSet.publishTick(sizeNow);
690                                         publishBatchCount = sizeNow;
691                                         count += sizeNow;
692                                         
693                                         String endTime = sdf.format(new Date());
694                                         LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
695                                                         + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
696                                                         + ",Batch End Time=" + endTime + "]");
697                                         batchId = DMaaPContext.getBatchID();
698                                 }
699                         }
700
701                         // send the pending batch
702                         final int sizeNow = batch.size();
703                         if (sizeNow > 0) {
704                                 String startTime = sdf.format(new Date());
705                                 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
706                                                 + batchId + "]");
707                                 try {
708                                         // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
709                                         
710                                         ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
711                                         
712                                         for (message msg : batch) {
713                                                 LogDetails logDetails = msg.getLogDetails();
714                                                 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
715                                         }
716                                 } catch (Exception excp) {
717                                         int status = HttpStatus.SC_NOT_FOUND;
718                                         String errorMsg = null;
719                                         if (excp instanceof CambriaApiException) {
720                                                 status = ((CambriaApiException) excp).getStatus();
721                                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
722                                                 JSONObject errObject = new JSONObject(jsonTokener);
723                                                 errorMsg = (String) errObject.get("message");
724                                         }
725
726                                         ErrorResponse errRes = new ErrorResponse(status,
727                                                         DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
728                                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
729                                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
730                                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
731                                                         ctx.getRequest().getRemoteHost(), null, null);
732                                         LOG.info(errRes.toString());
733                                         throw new CambriaApiException(errRes);
734                                 }
735                                 pms.clear();
736                                 metricsSet.publishTick(sizeNow);
737                                 count += sizeNow;
738                         
739                                 String endTime = sdf.format(new Date());
740                                 publishBatchCount = sizeNow;
741                                 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
742                                                 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
743                                                 + endTime + "]");
744                         }
745
746                         final long endMs = System.currentTimeMillis();
747                         final long totalMs = endMs - startMs;
748
749                         LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
750
751                         if (null != responseTransactionId) {
752                                 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
753                         }
754
755                         // build a response
756                         final JSONObject response = new JSONObject();
757                         response.put("count", count);
758                         response.put("serverTimeMs", totalMs);
759                         DMaaPResponseBuilder.respondOk(ctx, response);
760
761                 } catch (Exception excp) {
762                         int status = HttpStatus.SC_NOT_FOUND;
763                         String errorMsg = null;
764                         if (excp instanceof CambriaApiException) {
765                                 status = ((CambriaApiException) excp).getStatus();
766                                 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
767                                 JSONObject errObject = new JSONObject(jsonTokener);
768                                 errorMsg = (String) errObject.get("message");
769                         }
770
771                         ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
772                                         "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
773                                                         + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
774                                         null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
775                                         ctx.getRequest().getRemoteHost(), null, null);
776                         LOG.info(errRes.toString());
777                         throw new CambriaApiException(errRes);
778                 }
779         }
780
781         /**
782          * 
783          * @param msg
784          * @param topic
785          * @param request
786          * @param messageCreationTime
787          * @param messageSequence
788          * @param batchId
789          * @param transactionEnabled
790          */
791         private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
792                         final String messageCreationTime, final int messageSequence, final Long batchId,
793                         final boolean transactionEnabled) {
794                 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
795                                 transactionEnabled);
796                 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
797                 msg.setTransactionEnabled(transactionEnabled);
798                 msg.setLogDetails(logDetails);
799         }
800
801         /**
802          * 
803          * @author anowarul.islam
804          *
805          */
806         private static class LogWrap {
807                 private final String fId;
808
809                 /**
810                  * constructor initialization
811                  * 
812                  * @param topic
813                  * @param cgroup
814                  * @param cid
815                  */
816                 public LogWrap(String topic, String cgroup, String cid) {
817                         fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
818                 }
819
820                 /**
821                  * 
822                  * @param msg
823                  */
824                 public void info(String msg) {
825                         LOG.info(fId + msg);
826                 }
827
828                 /**
829                  * 
830                  * @param msg
831                  * @param t
832                  */
833                 public void warn(String msg, Exception t) {
834                         LOG.warn(fId + msg, t);
835                 }
836
837         }
838
839         public boolean isTransEnabled() {
840                 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
841                                 "transidUEBtopicreqd");
842                 boolean istransidreqd = false;
843                 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
844                         istransidreqd = true;
845                 }
846
847                 return istransidreqd;
848
849         }
850
851         private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
852                         final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
853                 LogDetails logDetails = new LogDetails();
854                 logDetails.setTopicId(topicName);
855                 logDetails.setMessageTimestamp(messageTimestamp);
856                 logDetails.setPublisherId(Utils.getUserApiKey(request));
857                 logDetails.setPublisherIp(request.getRemoteHost());
858                 logDetails.setMessageBatchId(batchId);
859                 logDetails.setMessageSequence(String.valueOf(messageSequence));
860                 logDetails.setTransactionEnabled(transactionEnabled);
861                 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
862                 logDetails.setServerIp(request.getLocalAddr());
863                 return logDetails;
864         }
865
866         
867          
868         
869          
870         
871          
872         
873
874
875 }