1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.math.NumberUtils;
45 import org.apache.http.HttpStatus;
46 import org.apache.kafka.clients.producer.ProducerRecord;
47 import org.apache.kafka.common.errors.TopicExistsException;
48 import org.json.JSONObject;
49 import org.json.JSONTokener;
50 import org.onap.dmaap.dmf.mr.CambriaApiException;
51 import org.onap.dmaap.dmf.mr.backends.Consumer;
52 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
54 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
55 import org.onap.dmaap.dmf.mr.backends.Publisher;
56 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
57 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
59 import org.onap.dmaap.dmf.mr.beans.LogDetails;
60 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
61 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
64 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
65 import org.onap.dmaap.dmf.mr.metabroker.Topic;
66 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
67 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
68 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.service.EventsService;
72 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
74 import org.onap.dmaap.dmf.mr.utils.Utils;
75 import org.springframework.beans.factory.annotation.Autowired;
76 import org.springframework.stereotype.Service;
79 * This class provides the functinality to publish and subscribe message to
82 * @author Ramkumar Sembaiyam
86 public class EventsServiceImpl implements EventsService {
88 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89 private static final String BATCH_LENGTH = "event.batch.length";
90 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
91 private static final String TIMEOUT_PROPERTY = "timeout";
92 private static final String SUBSCRIBE_ACTION = "sub";
93 private static final String PUBLISH_ACTION = "pub";
96 private DMaaPErrorMessages errorMessages;
98 String getPropertyFromAJSCmap(String propertyKey) {
99 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
102 public DMaaPErrorMessages getErrorMessages() {
103 return errorMessages;
106 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
107 this.errorMessages = errorMessages;
113 * @param consumerGroup
115 * @throws ConfigDbException,
116 * TopicExistsException, AccessDeniedException,
117 * UnavailableException, CambriaApiException, IOException
122 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
123 throws ConfigDbException, AccessDeniedException, UnavailableException,
124 CambriaApiException, IOException {
126 final long startTime = System.currentTimeMillis();
127 final HttpServletRequest req = ctx.getRequest();
128 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
129 final String remoteHost = req.getRemoteHost();
130 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
131 .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
133 validateIpBlacklist(errRespProvider, ctx);
135 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
136 if (metaTopic == null) {
137 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
140 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
142 final long elapsedMs1 = System.currentTimeMillis() - startTime;
143 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
147 final boolean pretty = isPrettyPrintEnabled();
148 final boolean withMeta = isMetaOffsetEnabled();
149 int timeoutMs = getMessageTimeout(req);
150 int limit = getMessageLimit(req);
151 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
152 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
154 Consumer consumer = null;
156 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
157 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
158 rl.onCall(topic, consumerGroup, clientId, remoteHost);
159 consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
161 CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
162 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
163 coes.setDmaapContext(ctx);
164 coes.setTopic(metaTopic);
165 coes.setTransEnabled(isTransEnabled() || isAAFTopic);
166 coes.setTopicStyle(isAAFTopic);
167 final long elapsedMs2 = System.currentTimeMillis() - startTime;
168 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
169 + consumerGroup + " " + clientId);
171 respondOkWithStream(ctx, coes);
172 // No IOException thrown during respondOkWithStream, so commit the
173 // new offsets to all the brokers
174 consumer.commitOffsets();
175 final int sent = coes.getSentCount();
176 metricsSet.consumeTick(sent);
177 rl.onSend(topic, consumerGroup, clientId, sent);
178 final long elapsedMs = System.currentTimeMillis() - startTime;
179 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
180 + topic + " " + consumerGroup + " " + clientId + " on to the server "
183 } catch (UnavailableException excp) {
184 logger.warn(excp.getMessage(), excp);
185 ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
186 LOG.info(errRes.toString());
187 throw new CambriaApiException(errRes);
189 } catch (ConcurrentModificationException excp1) {
190 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
191 ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
192 logger.info(errRes.toString());
193 throw new CambriaApiException(errRes);
195 } catch (Exception excp) {
196 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
197 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
198 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
199 ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
200 logger.info(errRes.toString());
201 throw new CambriaApiException(errRes);
203 if (consumer != null && !isCacheEnabled()) {
206 } catch (Exception e) {
207 logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
208 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
215 private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
216 final String remoteAddr = Utils.getRemoteAddress(ctx);
217 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
218 ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
219 LOG.info(errRes.toString());
220 throw new CambriaApiException(errRes);
224 private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
225 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
227 boolean isAAFTopic = false;
228 String metricTopicName = getMetricTopicName();
229 if(!metricTopicName.equalsIgnoreCase(topicName)) {
230 if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
232 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
233 String permission = aaf.aafPermissionString(topicName, action);
234 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
235 ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
236 LOG.info(errRes.toString());
237 throw new DMaaPAccessDeniedException(errRes);
240 } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
241 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
242 if(SUBSCRIBE_ACTION.equals(action)) {
243 metaTopic.checkUserRead(user);
244 } else if(PUBLISH_ACTION.equals(action)) {
245 metaTopic.checkUserWrite(user);
252 boolean isCadiEnabled() {
253 return Utils.isCadiEnabled();
256 void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
257 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
258 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
261 private int getMessageLimit(HttpServletRequest request) {
262 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
265 private int getMessageTimeout(HttpServletRequest request) {
266 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
267 int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
268 CambriaConstants.kNoTimeout;
270 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
271 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
274 private boolean isPrettyPrintEnabled() {
275 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
278 private boolean isMetaOffsetEnabled() {
279 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
282 private boolean isTopicNameEnforcedAaf(String topicName) {
283 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
284 return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
287 private boolean isCacheEnabled() {
288 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
289 return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
292 private void verifyHostId() {
293 String lhostId = getPropertyFromAJSCmap("clusterhostid");
294 if (lhostId.isEmpty()) {
296 InetAddress.getLocalHost().getCanonicalHostName();
297 } catch (UnknownHostException e) {
298 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
304 private String getMetricTopicName() {
305 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
306 return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
310 * @throws missingReqdSetting
314 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
315 final String requestTime) throws ConfigDbException, AccessDeniedException,
316 CambriaApiException, IOException, missingReqdSetting {
318 final long startMs = System.currentTimeMillis();
319 String remoteHost = ctx.getRequest().getRemoteHost();
320 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
321 .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
322 .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
324 validateIpBlacklist(errRespProvider, ctx);
326 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
329 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
331 final HttpServletRequest req = ctx.getRequest();
332 boolean chunked = isRequestedChunk(req);
333 String mediaType = getMediaType(req);
334 boolean transactionRequired = isTransactionIdRequired();
336 if (isAAFTopic || transactionRequired) {
337 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
339 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
342 final long endMs = System.currentTimeMillis();
343 final long totalMs = endMs - startMs;
344 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
347 private boolean isRequestedChunk(HttpServletRequest request) {
348 return null != request.getHeader(TRANSFER_ENCODING) &&
349 request.getHeader(TRANSFER_ENCODING).contains("chunked");
352 private String getMediaType(HttpServletRequest request) {
353 String mediaType = request.getContentType();
354 if (mediaType == null || mediaType.length() == 0) {
355 return MimeTypes.kAppGenericBinary;
357 return mediaType.replace("; charset=UTF-8", "").trim();
360 private boolean isTransactionIdRequired() {
361 return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
369 * @param defaultPartition
372 * @throws ConfigDbException
373 * @throws AccessDeniedException
374 * @throws TopicExistsException
375 * @throws CambriaApiException
376 * @throws IOException
378 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
380 throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
381 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
382 // setup the event set
383 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
385 // start processing, building a batch to push to the backend
386 final long startMs = System.currentTimeMillis();
388 long maxEventBatch = 1024L* 16;
389 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
390 if (null != batchlen && !batchlen.isEmpty())
391 maxEventBatch = Long.parseLong(batchlen);
392 // long maxEventBatch =
394 final LinkedList<Publisher.message> batch = new LinkedList<>();
395 // final ArrayList<KeyedMessage<String, String>> kms = new
397 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
399 // for each message...
400 Publisher.message m = null;
401 while ((m = events.next()) != null) {
402 // add the message to the batch
404 // final KeyedMessage<String, String> data = new
405 // KeyedMessage<String, String>(topic, m.getKey(),
408 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
412 // check if the batch is full
413 final int sizeNow = batch.size();
414 if (sizeNow > maxEventBatch) {
415 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
418 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
421 metricsSet.publishTick(sizeNow);
426 // send the pending batch
427 final int sizeNow = batch.size();
429 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
432 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
435 metricsSet.publishTick(sizeNow);
439 final long endMs = System.currentTimeMillis();
440 final long totalMs = endMs - startMs;
442 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
443 + ctx.getRequest().getRemoteHost());
446 final JSONObject response = new JSONObject();
447 response.put("count", count);
448 response.put("serverTimeMs", totalMs);
449 respondOk(ctx, response);
451 } catch (Exception excp) {
452 int status = HttpStatus.SC_NOT_FOUND;
453 String errorMsg = null;
454 if (excp instanceof CambriaApiException) {
455 status = ((CambriaApiException) excp).getStatus();
456 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
457 JSONObject errObject = new JSONObject(jsonTokener);
458 errorMsg = (String) errObject.get("message");
461 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
462 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
464 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
466 LOG.info(errRes.toString());
467 throw new CambriaApiException(errRes);
477 * @param partitionKey
481 * @throws ConfigDbException
482 * @throws AccessDeniedException
483 * @throws TopicExistsException
484 * @throws IOException
485 * @throws CambriaApiException
487 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
488 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
489 throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
491 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
493 // setup the event set
494 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
496 // start processing, building a batch to push to the backend
497 final long startMs = System.currentTimeMillis();
499 long maxEventBatch = 1024L * 16;
500 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
501 if (null != evenlen && !evenlen.isEmpty())
502 maxEventBatch = Long.parseLong(evenlen);
503 // final long maxEventBatch =
505 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
506 // final ArrayList<KeyedMessage<String, String>> kms = new
508 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
509 Publisher.message m = null;
510 int messageSequence = 1;
512 final boolean transactionEnabled = true;
513 int publishBatchCount = 0;
514 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
516 // LOG.warn("Batch Start Id: " +
519 // for each message...
520 batchId = DMaaPContext.getBatchID();
522 String responseTransactionId = null;
524 while ((m = events.next()) != null) {
526 // LOG.warn("Batch Start Id: " +
529 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
536 responseTransactionId = m.getLogDetails().getTransactionId();
538 //JSONObject jsonObject = new JSONObject();
539 //jsonObject.put("msgWrapMR", m.getMessage());
540 //jsonObject.put("transactionId", responseTransactionId);
541 // final KeyedMessage<String, String> data = new
542 // KeyedMessage<String, String>(topic, m.getKey(),
545 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
549 // check if the batch is full
550 final int sizeNow = batch.size();
551 if (sizeNow >= maxEventBatch) {
552 String startTime = sdf.format(new Date());
553 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
556 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
558 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
559 // transactionLogs(batch);
560 for (message msg : batch) {
561 LogDetails logDetails = msg.getLogDetails();
562 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
564 } catch (Exception excp) {
566 int status = HttpStatus.SC_NOT_FOUND;
567 String errorMsg = null;
568 if (excp instanceof CambriaApiException) {
569 status = ((CambriaApiException) excp).getStatus();
570 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
571 JSONObject errObject = new JSONObject(jsonTokener);
572 errorMsg = (String) errObject.get("message");
574 ErrorResponse errRes = new ErrorResponse(status,
575 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
576 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
577 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
578 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
579 ctx.getRequest().getRemoteHost(), null, null);
580 LOG.info(errRes.toString());
581 throw new CambriaApiException(errRes);
585 metricsSet.publishTick(sizeNow);
586 publishBatchCount = sizeNow;
589 String endTime = sdf.format(new Date());
590 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
591 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
592 + ",Batch End Time=" + endTime + "]");
593 batchId = DMaaPContext.getBatchID();
597 // send the pending batch
598 final int sizeNow = batch.size();
600 String startTime = sdf.format(new Date());
601 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
604 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
606 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
608 for (message msg : batch) {
609 LogDetails logDetails = msg.getLogDetails();
610 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
612 } catch (Exception excp) {
613 int status = HttpStatus.SC_NOT_FOUND;
614 String errorMsg = null;
615 if (excp instanceof CambriaApiException) {
616 status = ((CambriaApiException) excp).getStatus();
617 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
618 JSONObject errObject = new JSONObject(jsonTokener);
619 errorMsg = (String) errObject.get("message");
622 ErrorResponse errRes = new ErrorResponse(status,
623 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
624 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
625 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
626 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
627 ctx.getRequest().getRemoteHost(), null, null);
628 LOG.info(errRes.toString());
629 throw new CambriaApiException(errRes);
632 metricsSet.publishTick(sizeNow);
635 String endTime = sdf.format(new Date());
636 publishBatchCount = sizeNow;
637 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
638 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
642 final long endMs = System.currentTimeMillis();
643 final long totalMs = endMs - startMs;
645 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
647 if (null != responseTransactionId) {
648 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
652 final JSONObject response = new JSONObject();
653 response.put("count", count);
654 response.put("transactionId", responseTransactionId);
655 response.put("serverTimeMs", totalMs);
656 respondOk(ctx, response);
658 } catch (Exception excp) {
659 int status = HttpStatus.SC_NOT_FOUND;
660 String errorMsg = null;
661 if (excp instanceof CambriaApiException) {
662 status = ((CambriaApiException) excp).getStatus();
663 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
664 JSONObject errObject = new JSONObject(jsonTokener);
665 errorMsg = (String) errObject.get("message");
668 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
669 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
670 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
671 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
672 ctx.getRequest().getRemoteHost(), null, null);
673 LOG.info(errRes.toString());
674 throw new CambriaApiException(errRes);
683 * @param messageCreationTime
684 * @param messageSequence
686 * @param transactionEnabled
688 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
689 final String messageCreationTime, final int messageSequence, final Long batchId,
690 final boolean transactionEnabled) {
691 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
693 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
694 msg.setTransactionEnabled(transactionEnabled);
695 msg.setLogDetails(logDetails);
698 void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
699 DMaaPResponseBuilder.respondOk(ctx, response);
704 * @author anowarul.islam
707 private static class LogWrap {
708 private final String fId;
711 * constructor initialization
717 public LogWrap(String topic, String cgroup, String cid) {
718 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
725 public void info(String msg) {
734 public void warn(String msg, Exception t) {
735 LOG.warn(fId + msg, t);
740 public boolean isTransEnabled() {
741 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
742 boolean istransidreqd = false;
743 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
744 istransidreqd = true;
747 return istransidreqd;
751 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
752 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
753 LogDetails logDetails = new LogDetails();
754 logDetails.setTopicId(topicName);
755 logDetails.setMessageTimestamp(messageTimestamp);
756 logDetails.setPublisherId(Utils.getUserApiKey(request));
757 logDetails.setPublisherIp(request.getRemoteHost());
758 logDetails.setMessageBatchId(batchId);
759 logDetails.setMessageSequence(String.valueOf(messageSequence));
760 logDetails.setTransactionEnabled(transactionEnabled);
761 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
762 logDetails.setServerIp(request.getLocalAddr());