1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.StringUtils;
45 import org.apache.commons.lang.math.NumberUtils;
46 import org.apache.http.HttpStatus;
47 import org.apache.kafka.clients.producer.ProducerRecord;
48 import org.apache.kafka.common.errors.TopicExistsException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.onap.dmaap.dmf.mr.CambriaApiException;
52 import org.onap.dmaap.dmf.mr.backends.Consumer;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.backends.Publisher;
57 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
59 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
60 import org.onap.dmaap.dmf.mr.beans.LogDetails;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
65 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
66 import org.onap.dmaap.dmf.mr.metabroker.Topic;
67 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
68 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
72 import org.onap.dmaap.dmf.mr.service.EventsService;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
74 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
75 import org.onap.dmaap.dmf.mr.utils.Utils;
76 import org.springframework.beans.factory.annotation.Autowired;
77 import org.springframework.stereotype.Service;
80 * This class provides the functinality to publish and subscribe message to
83 * @author Ramkumar Sembaiyam
87 public class EventsServiceImpl implements EventsService {
89 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90 private static final String BATCH_LENGTH = "event.batch.length";
91 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92 private static final String TIMEOUT_PROPERTY = "timeout";
93 private static final String SUBSCRIBE_ACTION = "sub";
94 private static final String PUBLISH_ACTION = "pub";
97 private DMaaPErrorMessages errorMessages;
99 String getPropertyFromAJSCmap(String propertyKey) {
100 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
103 public DMaaPErrorMessages getErrorMessages() {
104 return errorMessages;
107 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
108 this.errorMessages = errorMessages;
114 * @param consumerGroup
116 * @throws ConfigDbException,
117 * TopicExistsException, AccessDeniedException,
118 * UnavailableException, CambriaApiException, IOException
123 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
124 throws ConfigDbException, AccessDeniedException, UnavailableException,
125 CambriaApiException, IOException {
127 final long startTime = System.currentTimeMillis();
128 final HttpServletRequest req = ctx.getRequest();
129 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
130 final String remoteHost = req.getRemoteHost();
131 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
132 .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
134 validateIpBlacklist(errRespProvider, ctx);
136 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
137 if (metaTopic == null) {
138 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
141 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
143 final long elapsedMs1 = System.currentTimeMillis() - startTime;
144 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
148 final boolean pretty = isPrettyPrintEnabled();
149 final boolean withMeta = isMetaOffsetEnabled();
150 int timeoutMs = getMessageTimeout(req);
151 int limit = getMessageLimit(req);
152 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
153 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
155 Consumer consumer = null;
157 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
158 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
159 rl.onCall(topic, consumerGroup, clientId, remoteHost);
160 consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
162 CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
163 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
164 coes.setDmaapContext(ctx);
165 coes.setTopic(metaTopic);
166 coes.setTransEnabled(isTransEnabled() || isAAFTopic);
167 coes.setTopicStyle(isAAFTopic);
168 final long elapsedMs2 = System.currentTimeMillis() - startTime;
169 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
170 + consumerGroup + " " + clientId);
172 respondOkWithStream(ctx, coes);
173 // No IOException thrown during respondOkWithStream, so commit the
174 // new offsets to all the brokers
175 consumer.commitOffsets();
176 final int sent = coes.getSentCount();
177 metricsSet.consumeTick(sent);
178 rl.onSend(topic, consumerGroup, clientId, sent);
179 final long elapsedMs = System.currentTimeMillis() - startTime;
180 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
181 + topic + " " + consumerGroup + " " + clientId + " on to the server "
184 } catch (UnavailableException excp) {
185 logger.warn(excp.getMessage(), excp);
186 ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
187 LOG.info(errRes.toString());
188 throw new CambriaApiException(errRes);
190 } catch (ConcurrentModificationException excp1) {
191 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
192 ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
193 logger.info(errRes.toString());
194 throw new CambriaApiException(errRes);
196 } catch (Exception excp) {
197 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
198 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
199 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
200 ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
201 logger.info(errRes.toString());
202 throw new CambriaApiException(errRes);
204 if (consumer != null && !isCacheEnabled()) {
207 } catch (Exception e) {
208 logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
209 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
216 private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
217 final String remoteAddr = Utils.getRemoteAddress(ctx);
218 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
219 ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
220 LOG.info(errRes.toString());
221 throw new CambriaApiException(errRes);
225 private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
226 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
228 boolean isAAFTopic = false;
229 String metricTopicName = getMetricTopicName();
230 if(!metricTopicName.equalsIgnoreCase(topicName)) {
231 if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
233 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
234 String permission = aaf.aafPermissionString(topicName, action);
235 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
236 ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
237 LOG.info(errRes.toString());
238 throw new DMaaPAccessDeniedException(errRes);
241 } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
242 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
243 if(SUBSCRIBE_ACTION.equals(action)) {
244 metaTopic.checkUserRead(user);
245 } else if(PUBLISH_ACTION.equals(action)) {
246 metaTopic.checkUserWrite(user);
253 boolean isCadiEnabled() {
254 return Utils.isCadiEnabled();
257 void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
258 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
259 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
262 private int getMessageLimit(HttpServletRequest request) {
263 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
266 private int getMessageTimeout(HttpServletRequest request) {
267 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
268 int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
269 CambriaConstants.kNoTimeout;
271 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
272 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
275 private boolean isPrettyPrintEnabled() {
276 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
279 private boolean isMetaOffsetEnabled() {
280 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
283 private boolean isTopicNameEnforcedAaf(String topicName) {
284 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
285 return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd);
288 private boolean isCacheEnabled() {
289 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
290 return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
293 private void verifyHostId() {
294 String lhostId = getPropertyFromAJSCmap("clusterhostid");
295 if (StringUtils.isEmpty(lhostId)) {
297 InetAddress.getLocalHost().getCanonicalHostName();
298 } catch (UnknownHostException e) {
299 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
305 private String getMetricTopicName() {
306 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
307 return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
311 * @throws missingReqdSetting
315 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
316 final String requestTime) throws ConfigDbException, AccessDeniedException,
317 CambriaApiException, IOException, missingReqdSetting {
319 final long startMs = System.currentTimeMillis();
320 String remoteHost = ctx.getRequest().getRemoteHost();
321 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
322 .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
323 .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
325 validateIpBlacklist(errRespProvider, ctx);
327 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
328 if (metaTopic == null) {
329 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
332 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
334 final HttpServletRequest req = ctx.getRequest();
335 boolean chunked = isRequestedChunk(req);
336 String mediaType = getMediaType(req);
337 boolean transactionRequired = isTransactionIdRequired();
339 if (isAAFTopic || transactionRequired) {
340 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
342 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
345 final long endMs = System.currentTimeMillis();
346 final long totalMs = endMs - startMs;
347 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
350 private boolean isRequestedChunk(HttpServletRequest request) {
351 return null != request.getHeader(TRANSFER_ENCODING) &&
352 request.getHeader(TRANSFER_ENCODING).contains("chunked");
355 private String getMediaType(HttpServletRequest request) {
356 String mediaType = request.getContentType();
357 if (mediaType == null || mediaType.length() == 0) {
358 return MimeTypes.kAppGenericBinary;
360 return mediaType.replace("; charset=UTF-8", "").trim();
363 private boolean isTransactionIdRequired() {
364 String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd");
365 return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true");
373 * @param defaultPartition
376 * @throws ConfigDbException
377 * @throws AccessDeniedException
378 * @throws TopicExistsException
379 * @throws CambriaApiException
380 * @throws IOException
382 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
384 throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
385 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
386 // setup the event set
387 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
389 // start processing, building a batch to push to the backend
390 final long startMs = System.currentTimeMillis();
392 long maxEventBatch = 1024L* 16;
393 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
394 if (null != batchlen && !batchlen.isEmpty())
395 maxEventBatch = Long.parseLong(batchlen);
396 // long maxEventBatch =
398 final LinkedList<Publisher.message> batch = new LinkedList<>();
399 // final ArrayList<KeyedMessage<String, String>> kms = new
401 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
403 // for each message...
404 Publisher.message m = null;
405 while ((m = events.next()) != null) {
406 // add the message to the batch
408 // final KeyedMessage<String, String> data = new
409 // KeyedMessage<String, String>(topic, m.getKey(),
412 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
416 // check if the batch is full
417 final int sizeNow = batch.size();
418 if (sizeNow > maxEventBatch) {
419 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
422 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
425 metricsSet.publishTick(sizeNow);
430 // send the pending batch
431 final int sizeNow = batch.size();
433 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
436 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
439 metricsSet.publishTick(sizeNow);
443 final long endMs = System.currentTimeMillis();
444 final long totalMs = endMs - startMs;
446 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
447 + ctx.getRequest().getRemoteHost());
450 final JSONObject response = new JSONObject();
451 response.put("count", count);
452 response.put("serverTimeMs", totalMs);
453 respondOk(ctx, response);
455 } catch (Exception excp) {
456 int status = HttpStatus.SC_NOT_FOUND;
457 String errorMsg = null;
458 if (excp instanceof CambriaApiException) {
459 status = ((CambriaApiException) excp).getStatus();
460 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
461 JSONObject errObject = new JSONObject(jsonTokener);
462 errorMsg = (String) errObject.get("message");
465 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
466 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
468 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
470 LOG.info(errRes.toString());
471 throw new CambriaApiException(errRes);
481 * @param partitionKey
485 * @throws ConfigDbException
486 * @throws AccessDeniedException
487 * @throws TopicExistsException
488 * @throws IOException
489 * @throws CambriaApiException
491 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
492 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
493 throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
495 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
497 // setup the event set
498 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
500 // start processing, building a batch to push to the backend
501 final long startMs = System.currentTimeMillis();
503 long maxEventBatch = 1024L * 16;
504 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
505 if (null != evenlen && !evenlen.isEmpty())
506 maxEventBatch = Long.parseLong(evenlen);
507 // final long maxEventBatch =
509 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
510 // final ArrayList<KeyedMessage<String, String>> kms = new
512 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
513 Publisher.message m = null;
514 int messageSequence = 1;
516 final boolean transactionEnabled = true;
517 int publishBatchCount = 0;
518 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
520 // LOG.warn("Batch Start Id: " +
523 // for each message...
524 batchId = DMaaPContext.getBatchID();
526 String responseTransactionId = null;
528 while ((m = events.next()) != null) {
530 // LOG.warn("Batch Start Id: " +
533 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
540 responseTransactionId = m.getLogDetails().getTransactionId();
542 //JSONObject jsonObject = new JSONObject();
543 //jsonObject.put("msgWrapMR", m.getMessage());
544 //jsonObject.put("transactionId", responseTransactionId);
545 // final KeyedMessage<String, String> data = new
546 // KeyedMessage<String, String>(topic, m.getKey(),
549 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
553 // check if the batch is full
554 final int sizeNow = batch.size();
555 if (sizeNow >= maxEventBatch) {
556 String startTime = sdf.format(new Date());
557 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
560 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
562 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
563 // transactionLogs(batch);
564 for (message msg : batch) {
565 LogDetails logDetails = msg.getLogDetails();
566 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
568 } catch (Exception excp) {
570 int status = HttpStatus.SC_NOT_FOUND;
571 String errorMsg = null;
572 if (excp instanceof CambriaApiException) {
573 status = ((CambriaApiException) excp).getStatus();
574 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
575 JSONObject errObject = new JSONObject(jsonTokener);
576 errorMsg = (String) errObject.get("message");
578 ErrorResponse errRes = new ErrorResponse(status,
579 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
580 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
581 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
582 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
583 ctx.getRequest().getRemoteHost(), null, null);
584 LOG.info(errRes.toString());
585 throw new CambriaApiException(errRes);
589 metricsSet.publishTick(sizeNow);
590 publishBatchCount = sizeNow;
593 String endTime = sdf.format(new Date());
594 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
595 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
596 + ",Batch End Time=" + endTime + "]");
597 batchId = DMaaPContext.getBatchID();
601 // send the pending batch
602 final int sizeNow = batch.size();
604 String startTime = sdf.format(new Date());
605 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
608 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
610 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
612 for (message msg : batch) {
613 LogDetails logDetails = msg.getLogDetails();
614 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
616 } catch (Exception excp) {
617 int status = HttpStatus.SC_NOT_FOUND;
618 String errorMsg = null;
619 if (excp instanceof CambriaApiException) {
620 status = ((CambriaApiException) excp).getStatus();
621 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
622 JSONObject errObject = new JSONObject(jsonTokener);
623 errorMsg = (String) errObject.get("message");
626 ErrorResponse errRes = new ErrorResponse(status,
627 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
628 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
629 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
630 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
631 ctx.getRequest().getRemoteHost(), null, null);
632 LOG.info(errRes.toString());
633 throw new CambriaApiException(errRes);
636 metricsSet.publishTick(sizeNow);
639 String endTime = sdf.format(new Date());
640 publishBatchCount = sizeNow;
641 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
642 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
646 final long endMs = System.currentTimeMillis();
647 final long totalMs = endMs - startMs;
649 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
651 if (null != responseTransactionId) {
652 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
656 final JSONObject response = new JSONObject();
657 response.put("count", count);
658 response.put("transactionId", responseTransactionId);
659 response.put("serverTimeMs", totalMs);
660 respondOk(ctx, response);
662 } catch (Exception excp) {
663 int status = HttpStatus.SC_NOT_FOUND;
664 String errorMsg = null;
665 if (excp instanceof CambriaApiException) {
666 status = ((CambriaApiException) excp).getStatus();
667 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
668 JSONObject errObject = new JSONObject(jsonTokener);
669 errorMsg = (String) errObject.get("message");
672 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
673 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
674 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
675 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
676 ctx.getRequest().getRemoteHost(), null, null);
677 LOG.info(errRes.toString());
678 throw new CambriaApiException(errRes);
687 * @param messageCreationTime
688 * @param messageSequence
690 * @param transactionEnabled
692 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
693 final String messageCreationTime, final int messageSequence, final Long batchId,
694 final boolean transactionEnabled) {
695 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
697 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
698 msg.setTransactionEnabled(transactionEnabled);
699 msg.setLogDetails(logDetails);
702 void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
703 DMaaPResponseBuilder.respondOk(ctx, response);
708 * @author anowarul.islam
711 private static class LogWrap {
712 private final String fId;
715 * constructor initialization
721 public LogWrap(String topic, String cgroup, String cid) {
722 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
729 public void info(String msg) {
738 public void warn(String msg, Exception t) {
739 LOG.warn(fId + msg, t);
744 public boolean isTransEnabled() {
745 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
746 boolean istransidreqd = false;
747 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
748 istransidreqd = true;
751 return istransidreqd;
755 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
756 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
757 LogDetails logDetails = new LogDetails();
758 logDetails.setTopicId(topicName);
759 logDetails.setMessageTimestamp(messageTimestamp);
760 logDetails.setPublisherId(Utils.getUserApiKey(request));
761 logDetails.setPublisherIp(request.getRemoteHost());
762 logDetails.setMessageBatchId(batchId);
763 logDetails.setMessageSequence(String.valueOf(messageSequence));
764 logDetails.setTransactionEnabled(transactionEnabled);
765 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
766 logDetails.setServerIp(request.getLocalAddr());