1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.service.impl;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.LinkedList;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.ws.rs.core.MediaType;
34 import org.apache.http.HttpStatus;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.stereotype.Service;
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import com.att.nsa.cambria.CambriaApiException;
45 import com.att.nsa.cambria.backends.Consumer;
46 import com.att.nsa.cambria.backends.ConsumerFactory;
47 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
48 import com.att.nsa.cambria.backends.MetricsSet;
49 import com.att.nsa.cambria.backends.Publisher;
50 import com.att.nsa.cambria.backends.Publisher.message;
51 import com.att.nsa.cambria.beans.DMaaPCambriaLimiter;
52 import com.att.nsa.cambria.beans.DMaaPContext;
53 import com.att.nsa.cambria.beans.LogDetails;
54 import com.att.nsa.cambria.constants.CambriaConstants;
55 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
56 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
57 import com.att.nsa.cambria.exception.DMaaPResponseCode;
58 import com.att.nsa.cambria.exception.ErrorResponse;
59 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
60 import com.att.nsa.cambria.metabroker.Topic;
61 import com.att.nsa.cambria.resources.CambriaEventSet;
62 import com.att.nsa.cambria.resources.CambriaOutboundEventStream;
63 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
64 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
65 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
66 import com.att.nsa.cambria.service.EventsService;
67 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
68 import com.att.nsa.cambria.utils.Utils;
69 import com.att.nsa.configs.ConfigDbException;
70 import com.att.nsa.drumlin.service.standards.MimeTypes;
71 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
72 import com.att.nsa.security.NsaApiKey;
73 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
74 import com.att.nsa.util.rrConvertor;
76 import kafka.producer.KeyedMessage;
79 * This class provides the functinality to publish and subscribe message to
86 public class EventsServiceImpl implements EventsService {
87 //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class);
88 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90 private static final String BATCH_LENGTH = "event.batch.length";
91 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
93 private DMaaPErrorMessages errorMessages;
95 //@Value("${metrics.send.cambria.topic}")
96 //private String metricsTopic;
101 * @param consumerGroup
103 * @throws ConfigDbException,
104 * TopicExistsException, AccessDeniedException,
105 * UnavailableException, CambriaApiException, IOException
110 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
111 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
112 CambriaApiException, IOException,DMaaPAccessDeniedException {
113 final long startTime = System.currentTimeMillis();
114 final HttpServletRequest req = ctx.getRequest();
116 boolean isAAFTopic=false;
117 // was this host blacklisted?
118 final String remoteAddr = Utils.getRemoteAddress(ctx);;
119 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
122 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
123 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
124 "] is blacklisted. Please contact the cluster management team."
125 ,null,Utils.getFormattedDate(new Date()),topic,
126 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
128 LOG.info(errRes.toString());
129 throw new CambriaApiException(errRes);
133 int limit = CambriaConstants.kNoLimit;
134 if (req.getParameter("limit") != null) {
135 limit = Integer.parseInt(req.getParameter("limit"));
138 int timeoutMs= CambriaConstants.kNoTimeout;
139 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout");
140 if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS);
141 //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout);
142 if (req.getParameter("timeout") != null) {
143 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
146 // By default no filter is applied if filter is not passed as a
147 // parameter in the request URI
148 String topicFilter = CambriaConstants.kNoFilter;
149 if (null != req.getParameter("filter")) {
150 topicFilter = req.getParameter("filter");
152 // pretty to print the messaages in new line
153 String prettyval="0";
154 String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty");
155 if (null!=strPretty)prettyval=strPretty;
158 String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta");
159 if (null!=strmeta)metaval=strmeta;
161 final boolean pretty = rrConvertor
162 .convertToBooleanBroad(prettyval);
163 // withMeta to print offset along with message
164 final boolean withMeta = rrConvertor
165 .convertToBooleanBroad(metaval);
168 /*final boolean pretty = rrConvertor
169 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0"));
170 // withMeta to print offset along with message
171 final boolean withMeta = rrConvertor
172 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0"));
174 final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId);
175 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter);
177 // is this user allowed to read this topic?
178 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
179 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
181 if (metatopic == null) {
183 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
184 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
185 errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null,
186 clientId,ctx.getRequest().getRemoteHost());
187 LOG.info(errRes.toString());
188 throw new CambriaApiException(errRes);
190 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
191 if (null==metricTopicname)
192 metricTopicname="msgrtr.apinode.metrics.dmaap";
194 if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname))
196 if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){
198 metatopic.checkUserRead(user);
201 // if headers are not provided then user will be null
202 if(user == null && null!=ctx.getRequest().getHeader("Authorization"))
204 // the topic name will be sent by the client
205 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
206 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
207 String permission = aaf.aafPermissionString(topic, "sub");
208 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
210 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
211 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
212 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null,
213 clientId,ctx.getRequest().getRemoteHost());
214 LOG.info(errRes.toString());
215 throw new DMaaPAccessDeniedException(errRes);
222 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
224 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
225 rl.onCall(topic, consumerGroup, clientId);
227 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
229 /* final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c,
230 ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter)
231 .pretty(pretty).withMeta(withMeta)
232 // .atOffset(topicOffset)
234 final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter)
235 .pretty(pretty).withMeta(withMeta).build();
236 coes.setDmaapContext(ctx);
237 coes.setTopic(metatopic);
238 if( isTransEnabled() || isAAFTopic ){
239 coes.setTransEnabled(true);
241 coes.setTransEnabled(false);
243 coes.setTopicStyle(isAAFTopic);
245 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
247 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
249 // No IOException thrown during respondOkWithStream, so commit the
250 // new offsets to all the brokers
252 final int sent = coes.getSentCount();
254 metricsSet.consumeTick(sent);
255 rl.onSend(topic, consumerGroup, clientId, sent);
257 final long elapsedMs = System.currentTimeMillis() - startTime;
258 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset());
260 } catch (UnavailableException excp) {
261 logger.warn(excp.getMessage(), excp);
263 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
264 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
265 errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
266 clientId,ctx.getRequest().getRemoteHost());
267 LOG.info(errRes.toString());
268 throw new CambriaApiException(errRes);
270 } catch (CambriaApiException excp) {
271 logger.warn(excp.getMessage(), excp);
273 } catch (Exception excp) {
274 logger.warn("Couldn't respond to client, closing cambria consumer", excp);
275 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
277 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
278 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
279 "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
280 clientId,ctx.getRequest().getRemoteHost());
281 LOG.info(errRes.toString());
282 throw new CambriaApiException(errRes);
284 // If no cache, close the consumer now that we're done with it.
285 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
286 String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache);
287 if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
288 //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
289 if (!kSetting_EnableCache && (c != null)) {
297 * @throws missingReqdSetting
301 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
302 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
303 CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException {
305 // is this user allowed to write to this topic?
306 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
307 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
308 boolean isAAFTopic=false;
310 // was this host blacklisted?
311 final String remoteAddr = Utils.getRemoteAddress(ctx);
313 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
316 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
317 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
318 "] is blacklisted. Please contact the cluster management team."
319 ,null,Utils.getFormattedDate(new Date()),topic,
320 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
322 LOG.info(errRes.toString());
323 throw new CambriaApiException(errRes);
326 String topicNameStd = null;
328 // topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
329 topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
330 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
331 if (null==metricTopicname)
332 metricTopicname="msgrtr.apinode.metrics.dmaap";
333 boolean topicNameEnforced=false;
334 if (null != topicNameStd && topic.startsWith(topicNameStd) )
336 topicNameEnforced = true;
339 //Here check if the user has rights to publish on the topic
340 //( This will be called when no auth is added or when UEB API Key Authentication is used)
341 //checkUserWrite(user) method will throw an error when there is no Auth header added or when the
342 //user has no publish rights
344 if(null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname))
346 metatopic.checkUserWrite(user);
351 // if headers are not provided then user will be null
352 if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)))
354 // the topic name will be sent by the client
355 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
356 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
357 String permission = aaf.aafPermissionString(topic, "pub");
358 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
360 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
361 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
362 errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,
363 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
365 LOG.info(errRes.toString());
366 throw new DMaaPAccessDeniedException(errRes);
371 final HttpServletRequest req = ctx.getRequest();
373 // check for chunked input
374 boolean chunked = false;
375 if (null != req.getHeader(TRANSFER_ENCODING)) {
376 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
378 // get the media type, or set it to a generic value if it wasn't
380 String mediaType = req.getContentType();
381 if (mediaType == null || mediaType.length() == 0) {
382 mediaType = MimeTypes.kAppGenericBinary;
385 if (mediaType.contains("charset=UTF-8")) {
386 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
389 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
390 boolean istransidreqd=false;
391 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){
392 istransidreqd = true;
395 if (isAAFTopic || istransidreqd ) {
396 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
400 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
411 * @param defaultPartition
414 * @throws ConfigDbException
415 * @throws AccessDeniedException
416 * @throws TopicExistsException
417 * @throws CambriaApiException
418 * @throws IOException
420 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition,
421 boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
422 CambriaApiException, IOException {
423 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
425 // setup the event set
426 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
428 // start processing, building a batch to push to the backend
429 final long startMs = System.currentTimeMillis();
432 long maxEventBatch=1024 * 16;
433 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
434 if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen);
436 // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
437 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
438 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
441 // for each message...
442 Publisher.message m = null;
443 while ((m = events.next()) != null) {
444 // add the message to the batch
446 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
449 // check if the batch is full
450 final int sizeNow = batch.size();
451 if (sizeNow > maxEventBatch) {
452 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
455 metricsSet.publishTick(sizeNow);
460 // send the pending batch
461 final int sizeNow = batch.size();
463 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
466 metricsSet.publishTick(sizeNow);
470 final long endMs = System.currentTimeMillis();
471 final long totalMs = endMs - startMs;
473 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
476 final JSONObject response = new JSONObject();
477 response.put("count", count);
478 response.put("serverTimeMs", totalMs);
479 DMaaPResponseBuilder.respondOk(ctx, response);
481 } catch (Exception excp) {
482 int status = HttpStatus.SC_NOT_FOUND;
483 String errorMsg=null;
484 if(excp instanceof CambriaApiException) {
485 status = ((CambriaApiException) excp).getStatus();
486 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
487 JSONObject errObject = new JSONObject(jsonTokener);
488 errorMsg = (String) errObject.get("message");
491 ErrorResponse errRes = new ErrorResponse(status,
492 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
493 errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
494 null,ctx.getRequest().getRemoteHost(),
496 LOG.info(errRes.toString());
497 throw new CambriaApiException(errRes);
508 * @param partitionKey
512 * @throws ConfigDbException
513 * @throws AccessDeniedException
514 * @throws TopicExistsException
515 * @throws IOException
516 * @throws CambriaApiException
518 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
519 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
520 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
521 CambriaApiException {
523 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
525 // setup the event set
526 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
528 // start processing, building a batch to push to the backend
529 final long startMs = System.currentTimeMillis();
531 long maxEventBatch = 1024 * 16;
532 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
533 if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen);
534 //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
535 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
536 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
538 Publisher.message m = null;
539 int messageSequence = 1;
541 final boolean transactionEnabled = true;
542 int publishBatchCount=0;
543 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
545 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
547 // for each message...
548 batchId=DMaaPContext.getBatchID();
550 String responseTransactionId = null;
552 while ((m = events.next()) != null) {
554 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
557 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
561 // add the message to the batch
564 responseTransactionId = m.getLogDetails().getTransactionId();
566 JSONObject jsonObject = new JSONObject();
567 jsonObject.put("message", m.getMessage());
568 jsonObject.put("transactionId", responseTransactionId);
569 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
570 jsonObject.toString());
573 // check if the batch is full
574 final int sizeNow = batch.size();
575 if (sizeNow >= maxEventBatch) {
576 String startTime = sdf.format(new Date());
577 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
579 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
580 //transactionLogs(batch);
581 for (message msg : batch) {
582 LogDetails logDetails = msg.getLogDetails();
583 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
585 } catch (Exception excp) {
587 int status = HttpStatus.SC_NOT_FOUND;
588 String errorMsg=null;
589 if(excp instanceof CambriaApiException) {
590 status = ((CambriaApiException) excp).getStatus();
591 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
592 JSONObject errObject = new JSONObject(jsonTokener);
593 errorMsg = (String) errObject.get("message");
595 ErrorResponse errRes = new ErrorResponse(status,
596 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
597 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,
598 null,Utils.getFormattedDate(new Date()),topic,
599 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
601 LOG.info(errRes.toString());
602 throw new CambriaApiException(errRes);
606 metricsSet.publishTick(sizeNow);
607 publishBatchCount=sizeNow;
610 String endTime = sdf.format(new Date());
611 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
612 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
613 batchId=DMaaPContext.getBatchID();
617 // send the pending batch
618 final int sizeNow = batch.size();
620 String startTime = sdf.format(new Date());
621 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
623 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
624 //transactionLogs(batch);
625 for (message msg : batch) {
626 LogDetails logDetails = msg.getLogDetails();
627 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
629 } catch (Exception excp) {
630 int status = HttpStatus.SC_NOT_FOUND;
631 String errorMsg=null;
632 if(excp instanceof CambriaApiException) {
633 status = ((CambriaApiException) excp).getStatus();
634 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
635 JSONObject errObject = new JSONObject(jsonTokener);
636 errorMsg = (String) errObject.get("message");
639 ErrorResponse errRes = new ErrorResponse(status,
640 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
641 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg,
642 null,Utils.getFormattedDate(new Date()),topic,
643 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
645 LOG.info(errRes.toString());
646 throw new CambriaApiException(errRes);
649 metricsSet.publishTick(sizeNow);
652 String endTime = sdf.format(new Date());
653 publishBatchCount=sizeNow;
654 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
655 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
658 final long endMs = System.currentTimeMillis();
659 final long totalMs = endMs - startMs;
661 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
663 if (null != responseTransactionId) {
664 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
668 final JSONObject response = new JSONObject();
669 response.put("count", count);
670 response.put("serverTimeMs", totalMs);
671 DMaaPResponseBuilder.respondOk(ctx, response);
673 } catch (Exception excp) {
674 int status = HttpStatus.SC_NOT_FOUND;
675 String errorMsg=null;
676 if(excp instanceof CambriaApiException) {
677 status = ((CambriaApiException) excp).getStatus();
678 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
679 JSONObject errObject = new JSONObject(jsonTokener);
680 errorMsg = (String) errObject.get("message");
683 ErrorResponse errRes = new ErrorResponse(
685 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
686 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
687 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
689 LOG.info(errRes.toString());
690 throw new CambriaApiException(errRes);
699 * @param messageCreationTime
700 * @param messageSequence
702 * @param transactionEnabled
704 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
705 final String messageCreationTime, final int messageSequence, final Long batchId,
706 final boolean transactionEnabled) {
707 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
709 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
710 msg.setTransactionEnabled(transactionEnabled);
711 msg.setLogDetails(logDetails);
721 private static class LogWrap {
722 private final String fId;
725 * constructor initialization
731 public LogWrap(String topic, String cgroup, String cid) {
732 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
739 public void info(String msg) {
748 public void warn(String msg, Exception t) {
749 LOG.warn(fId + msg, t);
754 private boolean isTransEnabled() {
755 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
756 boolean istransidreqd=false;
757 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){
758 istransidreqd = true;
761 return istransidreqd;
765 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
766 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
767 LogDetails logDetails = new LogDetails();
768 logDetails.setTopicId(topicName);
769 logDetails.setMessageTimestamp(messageTimestamp);
770 logDetails.setPublisherId(Utils.getUserApiKey(request));
771 logDetails.setPublisherIp(request.getRemoteHost());
772 logDetails.setMessageBatchId(batchId);
773 logDetails.setMessageSequence(String.valueOf(messageSequence));
774 logDetails.setTransactionEnabled(transactionEnabled);
775 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
776 logDetails.setServerIp(request.getLocalAddr());
780 /*public String getMetricsTopic() {
784 public void setMetricsTopic(String metricsTopic) {
785 this.metricsTopic = metricsTopic;