1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.LinkedList;
34 import java.util.Properties;
36 import javax.servlet.http.HttpServletRequest;
37 import javax.ws.rs.core.MediaType;
39 import org.apache.http.HttpStatus;
40 import org.apache.kafka.clients.consumer.ConsumerRecord;
41 import org.apache.kafka.clients.consumer.ConsumerRecords;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.apache.kafka.clients.producer.ProducerRecord;
44 import org.apache.kafka.common.errors.TopicExistsException;
45 import org.json.JSONObject;
46 import org.json.JSONTokener;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.beans.factory.annotation.Qualifier;
49 import org.springframework.stereotype.Service;
51 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
52 import org.onap.dmaap.dmf.mr.CambriaApiException;
53 import org.onap.dmaap.dmf.mr.backends.Consumer;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
55 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
56 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
57 import org.onap.dmaap.dmf.mr.backends.Publisher;
58 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
59 import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
60 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
61 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
62 import org.onap.dmaap.dmf.mr.beans.LogDetails;
63 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
65 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
66 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
67 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
69 import org.onap.dmaap.dmf.mr.metabroker.Topic;
70 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
71 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
72 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
73 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
74 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
75 import org.onap.dmaap.dmf.mr.service.EventsService;
76 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
77 import org.onap.dmaap.dmf.mr.utils.Utils;
78 import com.att.eelf.configuration.EELFLogger;
79 import com.att.eelf.configuration.EELFManager;
80 import com.att.nsa.configs.ConfigDbException;
81 import com.att.nsa.drumlin.service.standards.MimeTypes;
82 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
83 import com.att.nsa.security.NsaApiKey;
84 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
85 import com.att.nsa.util.rrConvertor;
88 * This class provides the functinality to publish and subscribe message to
91 * @author Ramkumar Sembaiyam
95 public class EventsServiceImpl implements EventsService {
96 // private static final Logger LOG =
98 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
100 private static final String BATCH_LENGTH = "event.batch.length";
101 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
103 private DMaaPErrorMessages errorMessages;
108 // @Value("${metrics.send.cambria.topic}")
111 public DMaaPErrorMessages getErrorMessages() {
112 return errorMessages;
115 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
116 this.errorMessages = errorMessages;
122 * @param consumerGroup
124 * @throws ConfigDbException,
125 * TopicExistsException, AccessDeniedException,
126 * UnavailableException, CambriaApiException, IOException
131 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
132 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
133 CambriaApiException, IOException, DMaaPAccessDeniedException {
134 final long startTime = System.currentTimeMillis();
135 final HttpServletRequest req = ctx.getRequest();
137 boolean isAAFTopic = false;
138 // was this host blacklisted?
139 final String remoteAddr = Utils.getRemoteAddress(ctx);
140 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
142 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
143 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
144 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
145 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
146 ctx.getRequest().getRemoteHost(), null, null);
147 LOG.info(errRes.toString());
148 throw new CambriaApiException(errRes);
151 int limit = CambriaConstants.kNoLimit;
152 if (req.getParameter("limit") != null) {
153 limit = Integer.parseInt(req.getParameter("limit"));
156 int timeoutMs = CambriaConstants.kNoTimeout;
157 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
158 if (strtimeoutMS != null)
159 timeoutMs = Integer.parseInt(strtimeoutMS);
160 // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
162 if (req.getParameter("timeout") != null) {
163 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
166 // By default no filter is applied if filter is not passed as a
167 // parameter in the request URI
168 String topicFilter = CambriaConstants.kNoFilter;
169 if (null != req.getParameter("filter")) {
170 topicFilter = req.getParameter("filter");
172 // pretty to print the messaages in new line
173 String prettyval = "0";
174 String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
175 if (null != strPretty)
176 prettyval = strPretty;
178 String metaval = "0";
179 String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
183 final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
184 // withMeta to print offset along with message
185 final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
187 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
188 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
190 // is this user allowed to read this topic?
191 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
192 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
194 if (metatopic == null) {
196 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
197 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
198 errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
199 topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
200 LOG.info(errRes.toString());
201 throw new CambriaApiException(errRes);
203 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
204 "metrics.send.cambria.topic");
205 if (null == metricTopicname)
206 metricTopicname = "msgrtr.apinode.metrics.dmaap";
208 if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
209 if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
211 metatopic.checkUserRead(user);
214 // if headers are not provided then user will be null
215 if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
216 // the topic name will be sent by the client
218 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
219 String permission = aaf.aafPermissionString(topic, "sub");
220 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
221 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
222 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
223 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
225 null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
226 ctx.getRequest().getRemoteHost());
227 LOG.info(errRes.toString());
228 throw new DMaaPAccessDeniedException(errRes);
233 final long elapsedMs1 = System.currentTimeMillis() - startTime;
234 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
238 String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
240 if (null == lhostId) {
242 lhostId = InetAddress.getLocalHost().getCanonicalHostName();
243 } catch (UnknownHostException e) {
244 LOG.info("Unknown Host Exception error occured while getting getting hostid");
248 CambriaOutboundEventStream coes = null;
250 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
251 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
252 rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
253 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
254 ctx.getRequest().getRemoteHost());
255 coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
256 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
257 coes.setDmaapContext(ctx);
258 coes.setTopic(metatopic);
259 if (isTransEnabled() || isAAFTopic) {
260 coes.setTransEnabled(true);
262 coes.setTransEnabled(false);
264 coes.setTopicStyle(isAAFTopic);
265 final long elapsedMs2 = System.currentTimeMillis() - startTime;
266 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
267 + consumerGroup + " " + clientId);
269 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
271 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
272 // No IOException thrown during respondOkWithStream, so commit the
273 // new offsets to all the brokers
275 final int sent = coes.getSentCount();
277 metricsSet.consumeTick(sent);
278 rl.onSend(topic, consumerGroup, clientId, sent);
279 final long elapsedMs = System.currentTimeMillis() - startTime;
280 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
281 + topic + " " + consumerGroup + " " + clientId + " on to the server "
282 + ctx.getRequest().getRemoteHost());
284 } catch (UnavailableException excp) {
285 logger.warn(excp.getMessage(), excp);
287 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
288 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
289 errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
290 null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
291 LOG.info(errRes.toString());
292 throw new CambriaApiException(errRes);
294 } catch (java.util.ConcurrentModificationException excp1) {
295 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
296 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
297 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
298 "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
299 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
300 logger.info(errRes.toString());
301 throw new CambriaApiException(errRes);
303 } catch (CambriaApiException excp) {
304 LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
308 catch (Exception excp) {
309 // System.out.println(excp + "------------------ " + topic+"
310 // "+consumerGroup+" "+clientId);
312 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
313 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
315 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
318 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
319 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
320 "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
321 Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
322 logger.info(errRes.toString());
323 throw new CambriaApiException(errRes);
326 // If no cache, close the consumer now that we're done with it.
327 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
328 String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
329 ConsumerFactory.kSetting_EnableCache);
330 if (null != strkSetting_EnableCache)
331 kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
333 // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
334 // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
335 if (!kSetting_EnableCache && (c != null)) {
338 } catch (Exception e) {
339 logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
340 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
348 * @throws missingReqdSetting
352 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
353 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
354 CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
356 // is this user allowed to write to this topic?
357 final long startMs = System.currentTimeMillis();
358 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
359 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
360 boolean isAAFTopic = false;
362 // was this host blacklisted?
363 final String remoteAddr = Utils.getRemoteAddress(ctx);
365 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
367 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
368 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
369 "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
370 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
371 ctx.getRequest().getRemoteHost(), null, null);
372 LOG.info(errRes.toString());
373 throw new CambriaApiException(errRes);
376 String topicNameStd = null;
380 topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
381 "enforced.topic.name.AAF");
382 String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
383 "metrics.send.cambria.topic");
384 if (null == metricTopicname)
385 metricTopicname = "msgrtr.apinode.metrics.dmaap";
386 boolean topicNameEnforced = false;
387 if (null != topicNameStd && topic.startsWith(topicNameStd)) {
388 topicNameEnforced = true;
391 // Here check if the user has rights to publish on the topic
392 // ( This will be called when no auth is added or when UEB API Key
393 // Authentication is used)
394 // checkUserWrite(user) method will throw an error when there is no Auth
395 // header added or when the
396 // user has no publish rights
398 if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
399 && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
400 metatopic.checkUserWrite(user);
403 // if headers are not provided then user will be null
404 if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
405 && !topic.equalsIgnoreCase(metricTopicname))) {
406 // the topic name will be sent by the client
408 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
409 String permission = aaf.aafPermissionString(topic, "pub");
410 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
411 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
412 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
413 errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
414 + " on " + permission,
415 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
416 ctx.getRequest().getRemoteHost(), null, null);
417 LOG.info(errRes.toString());
418 throw new DMaaPAccessDeniedException(errRes);
423 final HttpServletRequest req = ctx.getRequest();
425 // check for chunked input
426 boolean chunked = false;
427 if (null != req.getHeader(TRANSFER_ENCODING)) {
428 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
430 // get the media type, or set it to a generic value if it wasn't
432 String mediaType = req.getContentType();
433 if (mediaType == null || mediaType.length() == 0) {
434 mediaType = MimeTypes.kAppGenericBinary;
437 if (mediaType.contains("charset=UTF-8")) {
438 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
441 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
442 "transidUEBtopicreqd");
443 boolean istransidreqd = false;
444 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
445 istransidreqd = true;
448 if (isAAFTopic || istransidreqd) {
449 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
451 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
453 final long endMs = System.currentTimeMillis();
454 final long totalMs = endMs - startMs;
456 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
465 * @param defaultPartition
468 * @throws ConfigDbException
469 * @throws AccessDeniedException
470 * @throws TopicExistsException
471 * @throws CambriaApiException
472 * @throws IOException
474 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
476 throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
477 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
478 // setup the event set
479 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
481 // start processing, building a batch to push to the backend
482 final long startMs = System.currentTimeMillis();
484 long maxEventBatch = 1024L* 16;
485 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
486 if (null != batchlen)
487 maxEventBatch = Long.parseLong(batchlen);
488 // long maxEventBatch =
490 final LinkedList<Publisher.message> batch = new LinkedList<>();
491 // final ArrayList<KeyedMessage<String, String>> kms = new
493 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
495 // for each message...
496 Publisher.message m = null;
497 while ((m = events.next()) != null) {
498 // add the message to the batch
500 // final KeyedMessage<String, String> data = new
501 // KeyedMessage<String, String>(topic, m.getKey(),
504 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
508 // check if the batch is full
509 final int sizeNow = batch.size();
510 if (sizeNow > maxEventBatch) {
511 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
514 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
517 metricsSet.publishTick(sizeNow);
522 // send the pending batch
523 final int sizeNow = batch.size();
525 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
528 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
531 metricsSet.publishTick(sizeNow);
535 final long endMs = System.currentTimeMillis();
536 final long totalMs = endMs - startMs;
538 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
539 + ctx.getRequest().getRemoteHost());
542 final JSONObject response = new JSONObject();
543 response.put("count", count);
544 response.put("serverTimeMs", totalMs);
545 DMaaPResponseBuilder.respondOk(ctx, response);
547 } catch (Exception excp) {
548 int status = HttpStatus.SC_NOT_FOUND;
549 String errorMsg = null;
550 if (excp instanceof CambriaApiException) {
551 status = ((CambriaApiException) excp).getStatus();
552 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
553 JSONObject errObject = new JSONObject(jsonTokener);
554 errorMsg = (String) errObject.get("message");
557 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
558 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
560 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
562 LOG.info(errRes.toString());
563 throw new CambriaApiException(errRes);
573 * @param partitionKey
577 * @throws ConfigDbException
578 * @throws AccessDeniedException
579 * @throws TopicExistsException
580 * @throws IOException
581 * @throws CambriaApiException
583 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
584 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
585 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
587 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
589 // setup the event set
590 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
592 // start processing, building a batch to push to the backend
593 final long startMs = System.currentTimeMillis();
595 long maxEventBatch = 1024L * 16;
596 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
598 maxEventBatch = Long.parseLong(evenlen);
599 // final long maxEventBatch =
601 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
602 // final ArrayList<KeyedMessage<String, String>> kms = new
604 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
605 Publisher.message m = null;
606 int messageSequence = 1;
608 final boolean transactionEnabled = true;
609 int publishBatchCount = 0;
610 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
612 // LOG.warn("Batch Start Id: " +
615 // for each message...
616 batchId = DMaaPContext.getBatchID();
618 String responseTransactionId = null;
620 while ((m = events.next()) != null) {
622 // LOG.warn("Batch Start Id: " +
625 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
632 responseTransactionId = m.getLogDetails().getTransactionId();
634 JSONObject jsonObject = new JSONObject();
635 jsonObject.put("msgWrapMR", m.getMessage());
636 jsonObject.put("transactionId", responseTransactionId);
637 // final KeyedMessage<String, String> data = new
638 // KeyedMessage<String, String>(topic, m.getKey(),
641 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
645 // check if the batch is full
646 final int sizeNow = batch.size();
647 if (sizeNow >= maxEventBatch) {
648 String startTime = sdf.format(new Date());
649 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
652 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
654 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
655 // transactionLogs(batch);
656 for (message msg : batch) {
657 LogDetails logDetails = msg.getLogDetails();
658 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
660 } catch (Exception excp) {
662 int status = HttpStatus.SC_NOT_FOUND;
663 String errorMsg = null;
664 if (excp instanceof CambriaApiException) {
665 status = ((CambriaApiException) excp).getStatus();
666 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
667 JSONObject errObject = new JSONObject(jsonTokener);
668 errorMsg = (String) errObject.get("message");
670 ErrorResponse errRes = new ErrorResponse(status,
671 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
672 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
673 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
674 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
675 ctx.getRequest().getRemoteHost(), null, null);
676 LOG.info(errRes.toString());
677 throw new CambriaApiException(errRes);
681 metricsSet.publishTick(sizeNow);
682 publishBatchCount = sizeNow;
685 String endTime = sdf.format(new Date());
686 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
687 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
688 + ",Batch End Time=" + endTime + "]");
689 batchId = DMaaPContext.getBatchID();
693 // send the pending batch
694 final int sizeNow = batch.size();
696 String startTime = sdf.format(new Date());
697 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
700 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
702 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
703 // transactionLogs(batch);
704 for (message msg : batch) {
705 LogDetails logDetails = msg.getLogDetails();
706 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
708 } catch (Exception excp) {
709 int status = HttpStatus.SC_NOT_FOUND;
710 String errorMsg = null;
711 if (excp instanceof CambriaApiException) {
712 status = ((CambriaApiException) excp).getStatus();
713 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
714 JSONObject errObject = new JSONObject(jsonTokener);
715 errorMsg = (String) errObject.get("message");
718 ErrorResponse errRes = new ErrorResponse(status,
719 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
720 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
721 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
722 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
723 ctx.getRequest().getRemoteHost(), null, null);
724 LOG.info(errRes.toString());
725 throw new CambriaApiException(errRes);
728 metricsSet.publishTick(sizeNow);
731 String endTime = sdf.format(new Date());
732 publishBatchCount = sizeNow;
733 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
734 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
738 final long endMs = System.currentTimeMillis();
739 final long totalMs = endMs - startMs;
741 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
743 if (null != responseTransactionId) {
744 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
748 final JSONObject response = new JSONObject();
749 response.put("count", count);
750 response.put("serverTimeMs", totalMs);
751 DMaaPResponseBuilder.respondOk(ctx, response);
753 } catch (Exception excp) {
754 int status = HttpStatus.SC_NOT_FOUND;
755 String errorMsg = null;
756 if (excp instanceof CambriaApiException) {
757 status = ((CambriaApiException) excp).getStatus();
758 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
759 JSONObject errObject = new JSONObject(jsonTokener);
760 errorMsg = (String) errObject.get("message");
763 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
764 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
765 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
766 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
767 ctx.getRequest().getRemoteHost(), null, null);
768 LOG.info(errRes.toString());
769 throw new CambriaApiException(errRes);
778 * @param messageCreationTime
779 * @param messageSequence
781 * @param transactionEnabled
783 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
784 final String messageCreationTime, final int messageSequence, final Long batchId,
785 final boolean transactionEnabled) {
786 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
788 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
789 msg.setTransactionEnabled(transactionEnabled);
790 msg.setLogDetails(logDetails);
795 * @author anowarul.islam
798 private static class LogWrap {
799 private final String fId;
802 * constructor initialization
808 public LogWrap(String topic, String cgroup, String cid) {
809 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
816 public void info(String msg) {
825 public void warn(String msg, Exception t) {
826 LOG.warn(fId + msg, t);
831 public boolean isTransEnabled() {
832 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
833 "transidUEBtopicreqd");
834 boolean istransidreqd = false;
835 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
836 istransidreqd = true;
839 return istransidreqd;
843 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
844 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
845 LogDetails logDetails = new LogDetails();
846 logDetails.setTopicId(topicName);
847 logDetails.setMessageTimestamp(messageTimestamp);
848 logDetails.setPublisherId(Utils.getUserApiKey(request));
849 logDetails.setPublisherIp(request.getRemoteHost());
850 logDetails.setMessageBatchId(batchId);
851 logDetails.setMessageSequence(String.valueOf(messageSequence));
852 logDetails.setTransactionEnabled(transactionEnabled);
853 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
854 logDetails.setServerIp(request.getLocalAddr());
859 * public String getMetricsTopic() { return metricsTopic; }
861 * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =