1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.service.impl;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.LinkedList;
31 import javax.servlet.http.HttpServletRequest;
32 import javax.ws.rs.core.MediaType;
34 import org.apache.http.HttpStatus;
36 import com.att.eelf.configuration.EELFLogger;
37 import com.att.eelf.configuration.EELFManager;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.stereotype.Service;
43 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
44 import com.att.nsa.cambria.CambriaApiException;
45 import com.att.nsa.cambria.backends.Consumer;
46 import com.att.nsa.cambria.backends.ConsumerFactory;
47 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
48 import com.att.nsa.cambria.backends.MetricsSet;
49 import com.att.nsa.cambria.backends.Publisher;
50 import com.att.nsa.cambria.backends.Publisher.message;
51 import com.att.nsa.cambria.beans.DMaaPCambriaLimiter;
52 import com.att.nsa.cambria.beans.DMaaPContext;
53 import com.att.nsa.cambria.beans.LogDetails;
54 import com.att.nsa.cambria.constants.CambriaConstants;
55 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
56 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
57 import com.att.nsa.cambria.exception.DMaaPResponseCode;
58 import com.att.nsa.cambria.exception.ErrorResponse;
59 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
60 import com.att.nsa.cambria.metabroker.Topic;
61 import com.att.nsa.cambria.resources.CambriaEventSet;
62 import com.att.nsa.cambria.resources.CambriaOutboundEventStream;
63 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
64 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
65 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
66 import com.att.nsa.cambria.service.EventsService;
67 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
68 import com.att.nsa.cambria.utils.Utils;
69 import com.att.nsa.configs.ConfigDbException;
70 import com.att.nsa.drumlin.service.standards.MimeTypes;
71 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
72 import com.att.nsa.security.NsaApiKey;
73 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
74 import com.att.nsa.util.rrConvertor;
76 import kafka.producer.KeyedMessage;
79 * This class provides the functinality to publish and subscribe message to
86 public class EventsServiceImpl implements EventsService {
87 //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class);
88 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90 private static final String BATCH_LENGTH = "event.batch.length";
91 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
93 private DMaaPErrorMessages errorMessages;
95 //@Value("${metrics.send.cambria.topic}")
96 //private String metricsTopic;
98 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
99 this.errorMessages = errorMessages;
105 * @param consumerGroup
107 * @throws ConfigDbException,
108 * TopicExistsException, AccessDeniedException,
109 * UnavailableException, CambriaApiException, IOException
114 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
115 throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
116 CambriaApiException, IOException,DMaaPAccessDeniedException {
117 final long startTime = System.currentTimeMillis();
118 final HttpServletRequest req = ctx.getRequest();
121 throw new NullPointerException();
123 boolean isAAFTopic=false;
124 // was this host blacklisted?
125 final String remoteAddr = Utils.getRemoteAddress(ctx);;
126 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
129 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
130 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
131 "] is blacklisted. Please contact the cluster management team."
132 ,null,Utils.getFormattedDate(new Date()),topic,
133 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
135 LOG.info(errRes.toString());
136 throw new CambriaApiException(errRes);
140 int limit = CambriaConstants.kNoLimit;
141 if (req.getParameter("limit") != null) {
142 limit = Integer.parseInt(req.getParameter("limit"));
145 int timeoutMs= CambriaConstants.kNoTimeout;
146 String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout");
147 if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS);
148 //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout);
149 if (req.getParameter("timeout") != null) {
150 timeoutMs = Integer.parseInt(req.getParameter("timeout"));
153 // By default no filter is applied if filter is not passed as a
154 // parameter in the request URI
155 String topicFilter = CambriaConstants.kNoFilter;
156 if (null != req.getParameter("filter")) {
157 topicFilter = req.getParameter("filter");
159 // pretty to print the messaages in new line
160 String prettyval="0";
161 String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty");
162 if (null!=strPretty)prettyval=strPretty;
165 String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta");
166 if (null!=strmeta)metaval=strmeta;
168 final boolean pretty = rrConvertor
169 .convertToBooleanBroad(prettyval);
170 // withMeta to print offset along with message
171 final boolean withMeta = rrConvertor
172 .convertToBooleanBroad(metaval);
175 /*final boolean pretty = rrConvertor
176 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0"));
177 // withMeta to print offset along with message
178 final boolean withMeta = rrConvertor
179 .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0"));
181 final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId);
182 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter);
184 // is this user allowed to read this topic?
185 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
186 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
188 if (metatopic == null) {
190 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
191 DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
192 errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null,
193 clientId,ctx.getRequest().getRemoteHost());
194 LOG.info(errRes.toString());
195 throw new CambriaApiException(errRes);
197 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
198 if (null==metricTopicname)
199 metricTopicname="msgrtr.apinode.metrics.dmaap";
201 if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname))
203 if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){
205 metatopic.checkUserRead(user);
208 // if headers are not provided then user will be null
209 if(user == null && null!=ctx.getRequest().getHeader("Authorization"))
211 // the topic name will be sent by the client
212 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
213 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
214 String permission = aaf.aafPermissionString(topic, "sub");
215 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
217 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
218 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
219 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null,
220 clientId,ctx.getRequest().getRemoteHost());
221 LOG.info(errRes.toString());
222 throw new DMaaPAccessDeniedException(errRes);
229 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
231 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
232 rl.onCall(topic, consumerGroup, clientId);
234 c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
236 /* final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c,
237 ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter)
238 .pretty(pretty).withMeta(withMeta)
239 // .atOffset(topicOffset)
241 final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter)
242 .pretty(pretty).withMeta(withMeta).build();
243 coes.setDmaapContext(ctx);
244 coes.setTopic(metatopic);
245 if( isTransEnabled() || isAAFTopic ){
246 coes.setTransEnabled(true);
248 coes.setTransEnabled(false);
250 coes.setTopicStyle(isAAFTopic);
252 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
254 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
256 // No IOException thrown during respondOkWithStream, so commit the
257 // new offsets to all the brokers
259 final int sent = coes.getSentCount();
261 metricsSet.consumeTick(sent);
262 rl.onSend(topic, consumerGroup, clientId, sent);
264 final long elapsedMs = System.currentTimeMillis() - startTime;
265 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset());
267 } catch (UnavailableException excp) {
268 logger.warn(excp.getMessage(), excp);
270 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
271 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
272 errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
273 clientId,ctx.getRequest().getRemoteHost());
274 LOG.info(errRes.toString());
275 throw new CambriaApiException(errRes);
277 } catch (CambriaApiException excp) {
278 logger.warn(excp.getMessage(), excp);
280 } catch (Exception excp) {
281 logger.warn("Couldn't respond to client, closing cambria consumer", excp);
282 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
284 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
285 DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
286 "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
287 clientId,ctx.getRequest().getRemoteHost());
288 LOG.info(errRes.toString());
289 throw new CambriaApiException(errRes);
291 // If no cache, close the consumer now that we're done with it.
292 boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
293 String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache);
294 if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
295 //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
296 if (!kSetting_EnableCache && (c != null)) {
304 * @throws missingReqdSetting
308 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
309 final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
310 CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException {
312 // is this user allowed to write to this topic?
313 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
314 final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
315 boolean isAAFTopic=false;
317 // was this host blacklisted?
318 final String remoteAddr = Utils.getRemoteAddress(ctx);
320 if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
323 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
324 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
325 "] is blacklisted. Please contact the cluster management team."
326 ,null,Utils.getFormattedDate(new Date()),topic,
327 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
329 LOG.info(errRes.toString());
330 throw new CambriaApiException(errRes);
333 String topicNameStd = null;
335 // topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
336 topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
337 String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
338 if (null==metricTopicname)
339 metricTopicname="msgrtr.apinode.metrics.dmaap";
340 boolean topicNameEnforced=false;
341 if (null != topicNameStd && topic.startsWith(topicNameStd) )
343 topicNameEnforced = true;
346 //Here check if the user has rights to publish on the topic
347 //( This will be called when no auth is added or when UEB API Key Authentication is used)
348 //checkUserWrite(user) method will throw an error when there is no Auth header added or when the
349 //user has no publish rights
351 if(null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname))
353 metatopic.checkUserWrite(user);
358 // if headers are not provided then user will be null
359 if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)))
361 // the topic name will be sent by the client
362 // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
363 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
364 String permission = aaf.aafPermissionString(topic, "pub");
365 if(!aaf.aafAuthentication(ctx.getRequest(), permission))
367 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
368 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
369 errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,
370 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
372 LOG.info(errRes.toString());
373 throw new DMaaPAccessDeniedException(errRes);
378 final HttpServletRequest req = ctx.getRequest();
380 // check for chunked input
381 boolean chunked = false;
382 if (null != req.getHeader(TRANSFER_ENCODING)) {
383 chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
385 // get the media type, or set it to a generic value if it wasn't
387 String mediaType = req.getContentType();
388 if (mediaType == null || mediaType.length() == 0) {
389 mediaType = MimeTypes.kAppGenericBinary;
392 if (mediaType.contains("charset=UTF-8")) {
393 mediaType = mediaType.replace("; charset=UTF-8", "").trim();
396 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
397 boolean istransidreqd=false;
398 if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){
399 istransidreqd = true;
402 if (isAAFTopic || istransidreqd ) {
403 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
407 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
418 * @param defaultPartition
421 * @throws ConfigDbException
422 * @throws AccessDeniedException
423 * @throws TopicExistsException
424 * @throws CambriaApiException
425 * @throws IOException
427 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition,
428 boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
429 CambriaApiException, IOException {
430 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
432 // setup the event set
433 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
435 // start processing, building a batch to push to the backend
436 final long startMs = System.currentTimeMillis();
439 long maxEventBatch=1024 * 16;
440 String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
441 if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen);
443 // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
444 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
445 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
448 // for each message...
449 Publisher.message m = null;
450 while ((m = events.next()) != null) {
451 // add the message to the batch
453 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
456 // check if the batch is full
457 final int sizeNow = batch.size();
458 if (sizeNow > maxEventBatch) {
459 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
462 metricsSet.publishTick(sizeNow);
467 // send the pending batch
468 final int sizeNow = batch.size();
470 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
473 metricsSet.publishTick(sizeNow);
477 final long endMs = System.currentTimeMillis();
478 final long totalMs = endMs - startMs;
480 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
483 final JSONObject response = new JSONObject();
484 response.put("count", count);
485 response.put("serverTimeMs", totalMs);
486 DMaaPResponseBuilder.respondOk(ctx, response);
488 } catch (Exception excp) {
489 int status = HttpStatus.SC_NOT_FOUND;
490 String errorMsg=null;
491 if(excp instanceof CambriaApiException) {
492 status = ((CambriaApiException) excp).getStatus();
493 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
494 JSONObject errObject = new JSONObject(jsonTokener);
495 errorMsg = (String) errObject.get("message");
498 ErrorResponse errRes = new ErrorResponse(status,
499 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
500 errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
501 null,ctx.getRequest().getRemoteHost(),
503 LOG.info(errRes.toString());
504 throw new CambriaApiException(errRes);
515 * @param partitionKey
519 * @throws ConfigDbException
520 * @throws AccessDeniedException
521 * @throws TopicExistsException
522 * @throws IOException
523 * @throws CambriaApiException
525 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
526 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
527 throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
528 CambriaApiException {
530 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
532 // setup the event set
533 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
535 // start processing, building a batch to push to the backend
536 final long startMs = System.currentTimeMillis();
538 long maxEventBatch = 1024 * 16;
539 String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
540 if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen);
541 //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
542 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
543 final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
545 Publisher.message m = null;
546 int messageSequence = 1;
548 final boolean transactionEnabled = true;
549 int publishBatchCount=0;
550 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
552 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
554 // for each message...
555 batchId=DMaaPContext.getBatchID();
557 String responseTransactionId = null;
559 while ((m = events.next()) != null) {
561 //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
564 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
568 // add the message to the batch
571 responseTransactionId = m.getLogDetails().getTransactionId();
573 JSONObject jsonObject = new JSONObject();
574 jsonObject.put("message", m.getMessage());
575 jsonObject.put("transactionId", responseTransactionId);
576 final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
577 jsonObject.toString());
580 // check if the batch is full
581 final int sizeNow = batch.size();
582 if (sizeNow >= maxEventBatch) {
583 String startTime = sdf.format(new Date());
584 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
586 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
587 //transactionLogs(batch);
588 for (message msg : batch) {
589 LogDetails logDetails = msg.getLogDetails();
590 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
592 } catch (Exception excp) {
594 int status = HttpStatus.SC_NOT_FOUND;
595 String errorMsg=null;
596 if(excp instanceof CambriaApiException) {
597 status = ((CambriaApiException) excp).getStatus();
598 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
599 JSONObject errObject = new JSONObject(jsonTokener);
600 errorMsg = (String) errObject.get("message");
602 ErrorResponse errRes = new ErrorResponse(status,
603 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
604 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,
605 null,Utils.getFormattedDate(new Date()),topic,
606 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
608 LOG.info(errRes.toString());
609 throw new CambriaApiException(errRes);
613 metricsSet.publishTick(sizeNow);
614 publishBatchCount=sizeNow;
617 String endTime = sdf.format(new Date());
618 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
619 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
620 batchId=DMaaPContext.getBatchID();
624 // send the pending batch
625 final int sizeNow = batch.size();
627 String startTime = sdf.format(new Date());
628 LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
630 ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
631 //transactionLogs(batch);
632 for (message msg : batch) {
633 LogDetails logDetails = msg.getLogDetails();
634 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
636 } catch (Exception excp) {
637 int status = HttpStatus.SC_NOT_FOUND;
638 String errorMsg=null;
639 if(excp instanceof CambriaApiException) {
640 status = ((CambriaApiException) excp).getStatus();
641 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
642 JSONObject errObject = new JSONObject(jsonTokener);
643 errorMsg = (String) errObject.get("message");
646 ErrorResponse errRes = new ErrorResponse(status,
647 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
648 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg,
649 null,Utils.getFormattedDate(new Date()),topic,
650 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
652 LOG.info(errRes.toString());
653 throw new CambriaApiException(errRes);
656 metricsSet.publishTick(sizeNow);
659 String endTime = sdf.format(new Date());
660 publishBatchCount=sizeNow;
661 LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
662 + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
665 final long endMs = System.currentTimeMillis();
666 final long totalMs = endMs - startMs;
668 LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
670 if (null != responseTransactionId) {
671 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
675 final JSONObject response = new JSONObject();
676 response.put("count", count);
677 response.put("serverTimeMs", totalMs);
678 DMaaPResponseBuilder.respondOk(ctx, response);
680 } catch (Exception excp) {
681 int status = HttpStatus.SC_NOT_FOUND;
682 String errorMsg=null;
683 if(excp instanceof CambriaApiException) {
684 status = ((CambriaApiException) excp).getStatus();
685 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
686 JSONObject errObject = new JSONObject(jsonTokener);
687 errorMsg = (String) errObject.get("message");
690 ErrorResponse errRes = new ErrorResponse(
692 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
693 "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
694 Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
696 LOG.info(errRes.toString());
697 throw new CambriaApiException(errRes);
706 * @param messageCreationTime
707 * @param messageSequence
709 * @param transactionEnabled
711 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
712 final String messageCreationTime, final int messageSequence, final Long batchId,
713 final boolean transactionEnabled) {
714 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
716 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
717 msg.setTransactionEnabled(transactionEnabled);
718 msg.setLogDetails(logDetails);
728 private static class LogWrap {
729 private final String fId;
732 * constructor initialization
738 public LogWrap(String topic, String cgroup, String cid) {
739 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
746 public void info(String msg) {
755 public void warn(String msg, Exception t) {
756 LOG.warn(fId + msg, t);
761 private boolean isTransEnabled() {
762 String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
763 boolean istransidreqd=false;
764 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){
765 istransidreqd = true;
768 return istransidreqd;
772 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
773 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
774 LogDetails logDetails = new LogDetails();
775 logDetails.setTopicId(topicName);
776 logDetails.setMessageTimestamp(messageTimestamp);
777 logDetails.setPublisherId(Utils.getUserApiKey(request));
778 logDetails.setPublisherIp(request.getRemoteHost());
779 logDetails.setMessageBatchId(batchId);
780 logDetails.setMessageSequence(String.valueOf(messageSequence));
781 logDetails.setTransactionEnabled(transactionEnabled);
782 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
783 logDetails.setServerIp(request.getLocalAddr());
787 /*public String getMetricsTopic() {
791 public void setMetricsTopic(String metricsTopic) {
792 this.metricsTopic = metricsTopic;