1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
36 import javax.servlet.http.HttpServletRequest;
37 import javax.ws.rs.core.MediaType;
39 import org.apache.http.HttpStatus;
40 import org.apache.kafka.clients.consumer.ConsumerRecord;
41 import org.apache.kafka.clients.consumer.ConsumerRecords;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.apache.kafka.clients.producer.ProducerRecord;
44 import org.apache.kafka.common.errors.TopicExistsException;
45 import org.json.JSONObject;
46 import org.json.JSONTokener;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.beans.factory.annotation.Qualifier;
49 import org.springframework.stereotype.Service;
51 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
52 import org.onap.dmaap.dmf.mr.CambriaApiException;
53 import org.onap.dmaap.dmf.mr.backends.Consumer;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
55 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
56 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
57 import org.onap.dmaap.dmf.mr.backends.Publisher;
58 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
59 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
60 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
61 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
62 import org.onap.dmaap.dmf.mr.beans.LogDetails;
63 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
65 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
66 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
67 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
69 import org.onap.dmaap.dmf.mr.metabroker.Topic;
70 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
71 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
75 import org.onap.dmaap.dmf.mr.service.EventsService;
76 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
77 import org.onap.dmaap.dmf.mr.utils.Utils;
78 import com.att.eelf.configuration.EELFLogger;
79 import com.att.eelf.configuration.EELFManager;
80 import com.att.nsa.configs.ConfigDbException;
81 import com.att.nsa.drumlin.service.standards.MimeTypes;
82 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
83 import com.att.nsa.security.NsaApiKey;
84 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
85 import com.att.nsa.util.rrConvertor;
88 * This class provides the functinality to publish and subscribe message to
91 * @author Ramkumar Sembaiyam
95 public class EventsServiceImpl implements EventsService {
96 // private static final Logger LOG =
98 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
100 private static final String BATCH_LENGTH = "event.batch.length";
101 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
103 private DMaaPErrorMessages errorMessages;
108 // @Value("${metrics.send.cambria.topic}")
111 public DMaaPErrorMessages getErrorMessages() {
112 return errorMessages;
115 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
116 this.errorMessages = errorMessages;
122 * @param consumerGroup
124 * @throws ConfigDbException,
125 * TopicExistsException, AccessDeniedException,
126 * UnavailableException, CambriaApiException, IOException
131 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
132 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
133 CambriaApiException, IOException, DMaaPAccessDeniedException {
134 final long startTime = System.currentTimeMillis();
135 final HttpServletRequest req = ctx.getRequest();
137 boolean isAAFTopic = false;
138 // was this host blacklisted?
139 final String remoteAddr = Utils.getRemoteAddress(ctx);
140 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
142 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
143 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
144 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
145 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
146 ctx.getRequest().getRemoteHost(), null, null);
147 LOG.info(errRes.toString());
148 throw new CambriaApiException(errRes);
151 int limit = CambriaConstants.kNoLimit;
152 if (req.getParameter("limit") != null) {
153 limit = Integer.parseInt(req.getParameter("limit"));
156 int timeoutMs = CambriaConstants.kNoTimeout;
157 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
158 if (strtimeoutMS != null)
159 timeoutMs = Integer.parseInt(strtimeoutMS);
160 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
162 if (req.getParameter("timeout") != null) {
163 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
166 // By default no filter is applied if filter is not passed as a
167 // parameter in the request URI
168 String topicFilter = CambriaConstants.kNoFilter;
169 if (null != req.getParameter("filter")) {
170 topicFilter = req.getParameter("filter");
172 // pretty to print the messaages in new line
173 String prettyval = "0";
174 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
175 if (null != strPretty)
176 prettyval = strPretty;
178 String metaval = "0";
179 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
183 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
184 // withMeta to print offset along with message
185 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
187 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
188 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
190 // is this user allowed to read this topic?
191 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
192 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
194 if (metatopic == null) {
196 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
197 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
198 errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
199 topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
200 LOG.info(errRes.toString());
201 throw new CambriaApiException(errRes);
203 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
204 "metrics.send.cambria.topic");
205 if (null == metricTopicname)
206 metricTopicname = "msgrtr.apinode.metrics.dmaap";
208 boolean topicNameEnforced = false;
209 String topicNameStd = null;
210 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
211 "enforced.topic.name.AAF");
212 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
213 topicNameEnforced = true;
216 if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
217 if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
219 metatopic.checkUserRead(user);
222 // if headers are not provided then user will be null
223 if (topicNameEnforced&&user == null && null != ctx.getRequest().getHeader("Authorization")) {
224 // the topic name will be sent by the client
226 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
227 String permission = aaf.aafPermissionString(topic, "sub");
228 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
229 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
230 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
231 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
233 null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
234 ctx.getRequest().getRemoteHost());
235 LOG.info(errRes.toString());
236 throw new DMaaPAccessDeniedException(errRes);
241 final long elapsedMs1 = System.currentTimeMillis() - startTime;
242 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
246 String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
248 if (null == lhostId) {
250 lhostId = InetAddress.getLocalHost().getCanonicalHostName();
251 } catch (UnknownHostException e) {
252 LOG.info("Unknown Host Exception error occured while getting getting hostid");
256 CambriaOutboundEventStream coes = null;
258 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
259 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
260 rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
261 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
262 ctx.getRequest().getRemoteHost());
263 coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
264 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
265 coes.setDmaapContext(ctx);
266 coes.setTopic(metatopic);
267 if (isTransEnabled() || isAAFTopic) {
268 coes.setTransEnabled(true);
270 coes.setTransEnabled(false);
272 coes.setTopicStyle(isAAFTopic);
273 final long elapsedMs2 = System.currentTimeMillis() - startTime;
274 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
275 + consumerGroup + " " + clientId);
277 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
279 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
280 // No IOException thrown during respondOkWithStream, so commit the
281 // new offsets to all the brokers
283 final int sent = coes.getSentCount();
285 metricsSet.consumeTick(sent);
286 rl.onSend(topic, consumerGroup, clientId, sent);
287 final long elapsedMs = System.currentTimeMillis() - startTime;
288 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
289 + topic + " " + consumerGroup + " " + clientId + " on to the server "
290 + ctx.getRequest().getRemoteHost());
292 } catch (UnavailableException excp) {
293 logger.warn(excp.getMessage(), excp);
295 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
296 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
297 errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
298 null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
299 LOG.info(errRes.toString());
300 throw new CambriaApiException(errRes);
302 } catch (java.util.ConcurrentModificationException excp1) {
303 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
304 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
305 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
306 "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
307 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
308 logger.info(errRes.toString());
309 throw new CambriaApiException(errRes);
311 } catch (CambriaApiException excp) {
312 LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
316 catch (Exception excp) {
317 // System.out.println(excp + "------------------ " + topic+"
318 // "+consumerGroup+" "+clientId);
320 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
321 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
323 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
326 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
327 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
328 "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
329 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
330 logger.info(errRes.toString());
331 throw new CambriaApiException(errRes);
334 // If no cache, close the consumer now that we're done with it.
335 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
336 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
337 ConsumerFactory.kSetting_EnableCache);
338 if (null != strkSetting_EnableCache)
339 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
341 // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
342 // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
343 if (!kSetting_EnableCache && (c != null)) {
346 } catch (Exception e) {
347 logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
348 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
356 * @throws missingReqdSetting
360 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
361 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
362 CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
364 // is this user allowed to write to this topic?
365 final long startMs = System.currentTimeMillis();
366 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
367 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
368 boolean isAAFTopic = false;
370 // was this host blacklisted?
371 final String remoteAddr = Utils.getRemoteAddress(ctx);
373 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
375 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
376 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
377 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
378 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
379 ctx.getRequest().getRemoteHost(), null, null);
380 LOG.info(errRes.toString());
381 throw new CambriaApiException(errRes);
384 String topicNameStd = null;
388 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
389 "enforced.topic.name.AAF");
390 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
391 "metrics.send.cambria.topic");
392 if (null == metricTopicname)
393 metricTopicname = "msgrtr.apinode.metrics.dmaap";
394 boolean topicNameEnforced = false;
395 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
396 topicNameEnforced = true;
399 // Here check if the user has rights to publish on the topic
400 // ( This will be called when no auth is added or when UEB API Key
401 // Authentication is used)
402 // checkUserWrite(user) method will throw an error when there is no Auth
403 // header added or when the
404 // user has no publish rights
406 if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
407 && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
408 metatopic.checkUserWrite(user);
411 // if headers are not provided then user will be null
412 if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
413 && !topic.equalsIgnoreCase(metricTopicname))) {
414 // the topic name will be sent by the client
416 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
417 String permission = aaf.aafPermissionString(topic, "pub");
418 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
419 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
420 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
421 errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
422 + " on " + permission,
423 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
424 ctx.getRequest().getRemoteHost(), null, null);
425 LOG.info(errRes.toString());
426 throw new DMaaPAccessDeniedException(errRes);
431 final HttpServletRequest req = ctx.getRequest();
433 // check for chunked input
434 boolean chunked = false;
435 if (null != req.getHeader(TRANSFER_ENCODING)) {
436 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
438 // get the media type, or set it to a generic value if it wasn't
440 String mediaType = req.getContentType();
441 if (mediaType == null || mediaType.length() == 0) {
442 mediaType = MimeTypes.kAppGenericBinary;
445 if (mediaType.contains("charset=UTF-8")) {
446 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
449 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
450 "transidUEBtopicreqd");
451 boolean istransidreqd = false;
452 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
453 istransidreqd = true;
456 if (isAAFTopic || istransidreqd) {
457 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
459 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
461 final long endMs = System.currentTimeMillis();
462 final long totalMs = endMs - startMs;
464 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
473 * @param defaultPartition
476 * @throws ConfigDbException
477 * @throws AccessDeniedException
478 * @throws TopicExistsException
479 * @throws CambriaApiException
480 * @throws IOException
482 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
484 throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
485 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
486 // setup the event set
487 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
489 // start processing, building a batch to push to the backend
490 final long startMs = System.currentTimeMillis();
492 long maxEventBatch = 1024L* 16;
493 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
494 if (null != batchlen)
495 maxEventBatch = Long.parseLong(batchlen);
496 // long maxEventBatch =
498 final LinkedList<Publisher.message> batch = new LinkedList<>();
499 // final ArrayList<KeyedMessage<String, String>> kms = new
501 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
503 // for each message...
504 Publisher.message m = null;
505 while ((m = events.next()) != null) {
506 // add the message to the batch
508 // final KeyedMessage<String, String> data = new
509 // KeyedMessage<String, String>(topic, m.getKey(),
512 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
516 // check if the batch is full
517 final int sizeNow = batch.size();
518 if (sizeNow > maxEventBatch) {
519 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
522 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
525 metricsSet.publishTick(sizeNow);
530 // send the pending batch
531 final int sizeNow = batch.size();
533 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
536 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
539 metricsSet.publishTick(sizeNow);
543 final long endMs = System.currentTimeMillis();
544 final long totalMs = endMs - startMs;
546 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
547 + ctx.getRequest().getRemoteHost());
550 final JSONObject response = new JSONObject();
551 response.put("count", count);
552 response.put("serverTimeMs", totalMs);
553 DMaaPResponseBuilder.respondOk(ctx, response);
555 } catch (Exception excp) {
556 int status = HttpStatus.SC_NOT_FOUND;
557 String errorMsg = null;
558 if (excp instanceof CambriaApiException) {
559 status = ((CambriaApiException) excp).getStatus();
560 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
561 JSONObject errObject = new JSONObject(jsonTokener);
562 errorMsg = (String) errObject.get("message");
565 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
566 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
568 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
570 LOG.info(errRes.toString());
571 throw new CambriaApiException(errRes);
581 * @param partitionKey
585 * @throws ConfigDbException
586 * @throws AccessDeniedException
587 * @throws TopicExistsException
588 * @throws IOException
589 * @throws CambriaApiException
591 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
592 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
593 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
595 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
597 // setup the event set
598 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
600 // start processing, building a batch to push to the backend
601 final long startMs = System.currentTimeMillis();
603 long maxEventBatch = 1024L * 16;
604 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
606 maxEventBatch = Long.parseLong(evenlen);
607 // final long maxEventBatch =
609 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
610 // final ArrayList<KeyedMessage<String, String>> kms = new
612 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
613 Publisher.message m = null;
614 int messageSequence = 1;
616 final boolean transactionEnabled = true;
617 int publishBatchCount = 0;
618 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
620 // LOG.warn("Batch Start Id: " +
623 // for each message...
624 batchId = DMaaPContext.getBatchID();
626 String responseTransactionId = null;
628 while ((m = events.next()) != null) {
630 // LOG.warn("Batch Start Id: " +
633 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
640 responseTransactionId = m.getLogDetails().getTransactionId();
642 JSONObject jsonObject = new JSONObject();
643 jsonObject.put("msgWrapMR", m.getMessage());
644 jsonObject.put("transactionId", responseTransactionId);
645 // final KeyedMessage<String, String> data = new
646 // KeyedMessage<String, String>(topic, m.getKey(),
649 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
653 // check if the batch is full
654 final int sizeNow = batch.size();
655 if (sizeNow >= maxEventBatch) {
656 String startTime = sdf.format(new Date());
657 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
660 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
662 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
663 // transactionLogs(batch);
664 for (message msg : batch) {
665 LogDetails logDetails = msg.getLogDetails();
666 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
668 } catch (Exception excp) {
670 int status = HttpStatus.SC_NOT_FOUND;
671 String errorMsg = null;
672 if (excp instanceof CambriaApiException) {
673 status = ((CambriaApiException) excp).getStatus();
674 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
675 JSONObject errObject = new JSONObject(jsonTokener);
676 errorMsg = (String) errObject.get("message");
678 ErrorResponse errRes = new ErrorResponse(status,
679 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
680 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
681 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
682 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
683 ctx.getRequest().getRemoteHost(), null, null);
684 LOG.info(errRes.toString());
685 throw new CambriaApiException(errRes);
689 metricsSet.publishTick(sizeNow);
690 publishBatchCount = sizeNow;
693 String endTime = sdf.format(new Date());
694 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
695 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
696 + ",Batch End Time=" + endTime + "]");
697 batchId = DMaaPContext.getBatchID();
701 // send the pending batch
702 final int sizeNow = batch.size();
704 String startTime = sdf.format(new Date());
705 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
708 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
710 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
712 for (message msg : batch) {
713 LogDetails logDetails = msg.getLogDetails();
714 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
716 } catch (Exception excp) {
717 int status = HttpStatus.SC_NOT_FOUND;
718 String errorMsg = null;
719 if (excp instanceof CambriaApiException) {
720 status = ((CambriaApiException) excp).getStatus();
721 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
722 JSONObject errObject = new JSONObject(jsonTokener);
723 errorMsg = (String) errObject.get("message");
726 ErrorResponse errRes = new ErrorResponse(status,
727 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
728 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
729 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
730 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
731 ctx.getRequest().getRemoteHost(), null, null);
732 LOG.info(errRes.toString());
733 throw new CambriaApiException(errRes);
736 metricsSet.publishTick(sizeNow);
739 String endTime = sdf.format(new Date());
740 publishBatchCount = sizeNow;
741 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
742 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
746 final long endMs = System.currentTimeMillis();
747 final long totalMs = endMs - startMs;
749 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
751 if (null != responseTransactionId) {
752 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
756 final JSONObject response = new JSONObject();
757 response.put("count", count);
758 response.put("serverTimeMs", totalMs);
759 DMaaPResponseBuilder.respondOk(ctx, response);
761 } catch (Exception excp) {
762 int status = HttpStatus.SC_NOT_FOUND;
763 String errorMsg = null;
764 if (excp instanceof CambriaApiException) {
765 status = ((CambriaApiException) excp).getStatus();
766 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
767 JSONObject errObject = new JSONObject(jsonTokener);
768 errorMsg = (String) errObject.get("message");
771 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
772 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
773 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
774 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
775 ctx.getRequest().getRemoteHost(), null, null);
776 LOG.info(errRes.toString());
777 throw new CambriaApiException(errRes);
786 * @param messageCreationTime
787 * @param messageSequence
789 * @param transactionEnabled
791 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
792 final String messageCreationTime, final int messageSequence, final Long batchId,
793 final boolean transactionEnabled) {
794 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
796 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
797 msg.setTransactionEnabled(transactionEnabled);
798 msg.setLogDetails(logDetails);
803 * @author anowarul.islam
806 private static class LogWrap {
807 private final String fId;
810 * constructor initialization
816 public LogWrap(String topic, String cgroup, String cid) {
817 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
824 public void info(String msg) {
833 public void warn(String msg, Exception t) {
834 LOG.warn(fId + msg, t);
839 public boolean isTransEnabled() {
840 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
841 "transidUEBtopicreqd");
842 boolean istransidreqd = false;
843 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
844 istransidreqd = true;
847 return istransidreqd;
851 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
852 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
853 LogDetails logDetails = new LogDetails();
854 logDetails.setTopicId(topicName);
855 logDetails.setMessageTimestamp(messageTimestamp);
856 logDetails.setPublisherId(Utils.getUserApiKey(request));
857 logDetails.setPublisherIp(request.getRemoteHost());
858 logDetails.setMessageBatchId(batchId);
859 logDetails.setMessageSequence(String.valueOf(messageSequence));
860 logDetails.setTransactionEnabled(transactionEnabled);
861 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
862 logDetails.setServerIp(request.getLocalAddr());