1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.lang.math.NumberUtils;
35 import org.apache.http.HttpStatus;
36 import org.apache.kafka.clients.producer.ProducerRecord;
37 import org.apache.kafka.common.errors.TopicExistsException;
38 import org.json.JSONObject;
39 import org.json.JSONTokener;
40 import org.onap.dmaap.dmf.mr.CambriaApiException;
41 import org.onap.dmaap.dmf.mr.backends.Consumer;
42 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
43 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
44 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
45 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
46 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
47 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
48 import org.onap.dmaap.dmf.mr.beans.LogDetails;
49 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
50 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
51 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
52 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
53 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
54 import org.onap.dmaap.dmf.mr.metabroker.Topic;
55 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
56 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
57 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
58 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
59 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
60 import org.onap.dmaap.dmf.mr.service.EventsService;
61 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
62 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
63 import org.onap.dmaap.dmf.mr.utils.Utils;
64 import org.springframework.beans.factory.annotation.Autowired;
65 import org.springframework.stereotype.Service;
67 import javax.servlet.http.HttpServletRequest;
68 import javax.ws.rs.core.MediaType;
69 import java.io.IOException;
70 import java.io.InputStream;
71 import java.net.InetAddress;
72 import java.net.UnknownHostException;
73 import java.text.SimpleDateFormat;
74 import java.util.ArrayList;
75 import java.util.ConcurrentModificationException;
76 import java.util.Date;
77 import java.util.LinkedList;
80 * This class provides the functinality to publish and subscribe message to
83 * @author Ramkumar Sembaiyam
87 public class EventsServiceImpl implements EventsService {
89 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90 private static final String BATCH_LENGTH = "event.batch.length";
91 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92 private static final String TIMEOUT_PROPERTY = "timeout";
93 private static final String SUBSCRIBE_ACTION = "sub";
94 private static final String PUBLISH_ACTION = "pub";
97 private DMaaPErrorMessages errorMessages;
99 String getPropertyFromAJSCmap(String propertyKey) {
100 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
103 public DMaaPErrorMessages getErrorMessages() {
104 return errorMessages;
107 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
108 this.errorMessages = errorMessages;
114 * @param consumerGroup
116 * @throws ConfigDbException,
117 * TopicExistsException, AccessDeniedException,
118 * UnavailableException, CambriaApiException, IOException
123 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
124 throws ConfigDbException, AccessDeniedException, UnavailableException,
125 CambriaApiException, IOException {
127 final long startTime = System.currentTimeMillis();
128 final HttpServletRequest req = ctx.getRequest();
129 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
130 final String remoteHost = req.getRemoteHost();
131 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
132 .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
134 validateIpBlacklist(errRespProvider, ctx);
136 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
137 if (metaTopic == null) {
138 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
141 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
143 final long elapsedMs1 = System.currentTimeMillis() - startTime;
144 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
148 final boolean pretty = isPrettyPrintEnabled();
149 final boolean withMeta = isMetaOffsetEnabled();
150 int timeoutMs = getMessageTimeout(req);
151 int limit = getMessageLimit(req);
152 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
153 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
155 Consumer consumer = null;
157 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
158 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
159 rl.onCall(topic, consumerGroup, clientId, remoteHost);
160 consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
162 CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
163 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
164 coes.setDmaapContext(ctx);
165 coes.setTopic(metaTopic);
166 coes.setTransEnabled(isTransEnabled() || isAAFTopic);
167 coes.setTopicStyle(isAAFTopic);
168 final long elapsedMs2 = System.currentTimeMillis() - startTime;
169 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
170 + consumerGroup + " " + clientId);
172 respondOkWithStream(ctx, coes);
173 // No IOException thrown during respondOkWithStream, so commit the
174 // new offsets to all the brokers
175 consumer.commitOffsets();
176 final int sent = coes.getSentCount();
177 metricsSet.consumeTick(sent);
178 rl.onSend(topic, consumerGroup, clientId, sent);
179 final long elapsedMs = System.currentTimeMillis() - startTime;
180 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
181 + topic + " " + consumerGroup + " " + clientId + " on to the server "
184 } catch (UnavailableException excp) {
185 logger.warn(excp.getMessage(), excp);
186 ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
187 LOG.info(errRes.toString());
188 throw new CambriaApiException(errRes);
190 } catch (ConcurrentModificationException excp1) {
191 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
192 ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
193 logger.info(errRes.toString());
194 throw new CambriaApiException(errRes);
196 } catch (Exception excp) {
197 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
198 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
199 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
200 ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
201 logger.info(errRes.toString());
202 throw new CambriaApiException(errRes);
204 if (consumer != null && !isCacheEnabled()) {
207 } catch (Exception e) {
208 logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
209 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
216 private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
217 final String remoteAddr = Utils.getRemoteAddress(ctx);
218 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
219 ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
220 LOG.info(errRes.toString());
221 throw new CambriaApiException(errRes);
225 private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
226 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
228 boolean isAAFTopic = false;
229 String metricTopicName = getMetricTopicName();
230 if(!metricTopicName.equalsIgnoreCase(topicName)) {
231 if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
233 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
234 String permission = aaf.aafPermissionString(topicName, action);
235 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
236 ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
237 LOG.info(errRes.toString());
238 throw new DMaaPAccessDeniedException(errRes);
241 } else if(metaTopic!=null && null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
242 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
243 if(SUBSCRIBE_ACTION.equals(action)) {
244 metaTopic.checkUserRead(user);
245 } else if(PUBLISH_ACTION.equals(action)) {
246 metaTopic.checkUserWrite(user);
253 boolean isCadiEnabled() {
254 return Utils.isCadiEnabled();
257 void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
258 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
259 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
262 private int getMessageLimit(HttpServletRequest request) {
263 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
266 private int getMessageTimeout(HttpServletRequest request) {
267 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
268 int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
269 CambriaConstants.kNoTimeout;
271 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
272 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
275 private boolean isPrettyPrintEnabled() {
276 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
279 private boolean isMetaOffsetEnabled() {
280 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
283 private boolean isTopicNameEnforcedAaf(String topicName) {
284 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
285 return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd);
288 private boolean isCacheEnabled() {
289 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
290 return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
293 private void verifyHostId() {
294 String lhostId = getPropertyFromAJSCmap("clusterhostid");
295 if (StringUtils.isEmpty(lhostId)) {
297 InetAddress.getLocalHost().getCanonicalHostName();
298 } catch (UnknownHostException e) {
299 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
305 private String getMetricTopicName() {
306 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
307 return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
311 * @throws missingReqdSetting
315 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
316 final String requestTime) throws ConfigDbException, AccessDeniedException,
317 CambriaApiException, IOException, missingReqdSetting {
319 final long startMs = System.currentTimeMillis();
320 String remoteHost = ctx.getRequest().getRemoteHost();
321 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
322 .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
323 .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
325 validateIpBlacklist(errRespProvider, ctx);
327 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
329 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
331 final HttpServletRequest req = ctx.getRequest();
332 boolean chunked = isRequestedChunk(req);
333 String mediaType = getMediaType(req);
334 boolean transactionRequired = isTransactionIdRequired();
336 if (isAAFTopic || transactionRequired) {
337 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
339 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
342 final long endMs = System.currentTimeMillis();
343 final long totalMs = endMs - startMs;
344 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
347 private boolean isRequestedChunk(HttpServletRequest request) {
348 return null != request.getHeader(TRANSFER_ENCODING) &&
349 request.getHeader(TRANSFER_ENCODING).contains("chunked");
352 private String getMediaType(HttpServletRequest request) {
353 String mediaType = request.getContentType();
354 if (mediaType == null || mediaType.length() == 0) {
355 return MimeTypes.kAppGenericBinary;
357 return mediaType.replace("; charset=UTF-8", "").trim();
360 private boolean isTransactionIdRequired() {
361 String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd");
362 return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true");
370 * @param defaultPartition
373 * @throws ConfigDbException
374 * @throws AccessDeniedException
375 * @throws TopicExistsException
376 * @throws CambriaApiException
377 * @throws IOException
379 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
381 throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
382 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
383 // setup the event set
384 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
386 // start processing, building a batch to push to the backend
387 final long startMs = System.currentTimeMillis();
389 long maxEventBatch = 1024L* 16;
390 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
391 if (null != batchlen && !batchlen.isEmpty())
392 maxEventBatch = Long.parseLong(batchlen);
393 // long maxEventBatch =
395 final LinkedList<message> batch = new LinkedList<>();
396 // final ArrayList<KeyedMessage<String, String>> kms = new
398 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
400 // for each message...
402 while ((m = events.next()) != null) {
403 // add the message to the batch
405 // final KeyedMessage<String, String> data = new
406 // KeyedMessage<String, String>(topic, m.getKey(),
409 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
413 // check if the batch is full
414 final int sizeNow = batch.size();
415 if (sizeNow > maxEventBatch) {
416 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
419 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
422 metricsSet.publishTick(sizeNow);
427 // send the pending batch
428 final int sizeNow = batch.size();
430 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
433 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
436 metricsSet.publishTick(sizeNow);
440 final long endMs = System.currentTimeMillis();
441 final long totalMs = endMs - startMs;
443 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
444 + ctx.getRequest().getRemoteHost());
447 final JSONObject response = new JSONObject();
448 response.put("count", count);
449 response.put("serverTimeMs", totalMs);
450 respondOk(ctx, response);
452 } catch (Exception excp) {
453 int status = HttpStatus.SC_NOT_FOUND;
454 String errorMsg = null;
455 if (excp instanceof CambriaApiException) {
456 status = ((CambriaApiException) excp).getStatus();
457 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
458 JSONObject errObject = new JSONObject(jsonTokener);
459 errorMsg = (String) errObject.get("message");
462 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
463 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
465 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
467 LOG.info(errRes.toString());
468 throw new CambriaApiException(errRes);
478 * @param partitionKey
482 * @throws ConfigDbException
483 * @throws AccessDeniedException
484 * @throws TopicExistsException
485 * @throws IOException
486 * @throws CambriaApiException
488 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
489 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
490 throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
492 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
494 // setup the event set
495 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
497 // start processing, building a batch to push to the backend
498 final long startMs = System.currentTimeMillis();
500 long maxEventBatch = 1024L * 16;
501 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
502 if (null != evenlen && !evenlen.isEmpty())
503 maxEventBatch = Long.parseLong(evenlen);
504 // final long maxEventBatch =
506 final LinkedList<message> batch = new LinkedList<message>();
507 // final ArrayList<KeyedMessage<String, String>> kms = new
509 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
511 int messageSequence = 1;
513 final boolean transactionEnabled = true;
514 int publishBatchCount = 0;
515 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
517 // LOG.warn("Batch Start Id: " +
520 // for each message...
521 batchId = DMaaPContext.getBatchID();
523 String responseTransactionId = null;
525 while ((m = events.next()) != null) {
527 // LOG.warn("Batch Start Id: " +
530 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
537 responseTransactionId = m.getLogDetails().getTransactionId();
539 //JSONObject jsonObject = new JSONObject();
540 //jsonObject.put("msgWrapMR", m.getMessage());
541 //jsonObject.put("transactionId", responseTransactionId);
542 // final KeyedMessage<String, String> data = new
543 // KeyedMessage<String, String>(topic, m.getKey(),
546 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
550 // check if the batch is full
551 final int sizeNow = batch.size();
552 if (sizeNow >= maxEventBatch) {
553 String startTime = sdf.format(new Date());
554 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
557 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
559 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
560 // transactionLogs(batch);
561 for (message msg : batch) {
562 LogDetails logDetails = msg.getLogDetails();
563 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
565 } catch (Exception excp) {
567 int status = HttpStatus.SC_NOT_FOUND;
568 String errorMsg = null;
569 if (excp instanceof CambriaApiException) {
570 status = ((CambriaApiException) excp).getStatus();
571 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
572 JSONObject errObject = new JSONObject(jsonTokener);
573 errorMsg = (String) errObject.get("message");
575 ErrorResponse errRes = new ErrorResponse(status,
576 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
577 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
578 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
579 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
580 ctx.getRequest().getRemoteHost(), null, null);
581 LOG.info(errRes.toString());
582 throw new CambriaApiException(errRes);
586 metricsSet.publishTick(sizeNow);
587 publishBatchCount = sizeNow;
590 String endTime = sdf.format(new Date());
591 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
592 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
593 + ",Batch End Time=" + endTime + "]");
594 batchId = DMaaPContext.getBatchID();
598 // send the pending batch
599 final int sizeNow = batch.size();
601 String startTime = sdf.format(new Date());
602 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
605 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
607 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
609 for (message msg : batch) {
610 LogDetails logDetails = msg.getLogDetails();
611 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
613 } catch (Exception excp) {
614 int status = HttpStatus.SC_NOT_FOUND;
615 String errorMsg = null;
616 if (excp instanceof CambriaApiException) {
617 status = ((CambriaApiException) excp).getStatus();
618 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
619 JSONObject errObject = new JSONObject(jsonTokener);
620 errorMsg = (String) errObject.get("message");
623 ErrorResponse errRes = new ErrorResponse(status,
624 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
625 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
626 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
627 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
628 ctx.getRequest().getRemoteHost(), null, null);
629 LOG.info(errRes.toString());
630 throw new CambriaApiException(errRes);
633 metricsSet.publishTick(sizeNow);
636 String endTime = sdf.format(new Date());
637 publishBatchCount = sizeNow;
638 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
639 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
643 final long endMs = System.currentTimeMillis();
644 final long totalMs = endMs - startMs;
646 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
648 if (null != responseTransactionId) {
649 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
653 final JSONObject response = new JSONObject();
654 response.put("count", count);
655 response.put("transactionId", responseTransactionId);
656 response.put("serverTimeMs", totalMs);
657 respondOk(ctx, response);
659 } catch (Exception excp) {
660 int status = HttpStatus.SC_NOT_FOUND;
661 String errorMsg = null;
662 if (excp instanceof CambriaApiException) {
663 status = ((CambriaApiException) excp).getStatus();
664 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
665 JSONObject errObject = new JSONObject(jsonTokener);
666 errorMsg = (String) errObject.get("message");
669 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
670 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
671 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
672 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
673 ctx.getRequest().getRemoteHost(), null, null);
674 LOG.info(errRes.toString());
675 throw new CambriaApiException(errRes);
684 * @param messageCreationTime
685 * @param messageSequence
687 * @param transactionEnabled
689 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
690 final String messageCreationTime, final int messageSequence, final Long batchId,
691 final boolean transactionEnabled) {
692 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
694 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
695 msg.setTransactionEnabled(transactionEnabled);
696 msg.setLogDetails(logDetails);
699 void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
700 DMaaPResponseBuilder.respondOk(ctx, response);
705 * @author anowarul.islam
708 private static class LogWrap {
709 private final String fId;
712 * constructor initialization
718 public LogWrap(String topic, String cgroup, String cid) {
719 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
726 public void info(String msg) {
735 public void warn(String msg, Exception t) {
736 LOG.warn(fId + msg, t);
741 public boolean isTransEnabled() {
742 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
743 boolean istransidreqd = false;
744 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
745 istransidreqd = true;
748 return istransidreqd;
752 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
753 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
754 LogDetails logDetails = new LogDetails();
755 logDetails.setTopicId(topicName);
756 logDetails.setMessageTimestamp(messageTimestamp);
757 logDetails.setPublisherId(Utils.getUserApiKey(request));
758 logDetails.setPublisherIp(request.getRemoteHost());
759 logDetails.setMessageBatchId(batchId);
760 logDetails.setMessageSequence(String.valueOf(messageSequence));
761 logDetails.setTransactionEnabled(transactionEnabled);
762 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
763 logDetails.setServerIp(request.getLocalAddr());