1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.math.NumberUtils;
45 import org.apache.http.HttpStatus;
46 import org.apache.kafka.clients.producer.ProducerRecord;
47 import org.apache.kafka.common.errors.TopicExistsException;
48 import org.json.JSONObject;
49 import org.json.JSONTokener;
50 import org.onap.dmaap.dmf.mr.CambriaApiException;
51 import org.onap.dmaap.dmf.mr.backends.Consumer;
52 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
54 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
55 import org.onap.dmaap.dmf.mr.backends.Publisher;
56 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
57 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
59 import org.onap.dmaap.dmf.mr.beans.LogDetails;
60 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
61 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
64 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
65 import org.onap.dmaap.dmf.mr.metabroker.Topic;
66 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
67 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
68 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.service.EventsService;
72 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
74 import org.onap.dmaap.dmf.mr.utils.Utils;
75 import org.springframework.beans.factory.annotation.Autowired;
76 import org.springframework.stereotype.Service;
79 * This class provides the functinality to publish and subscribe message to
82 * @author Ramkumar Sembaiyam
86 public class EventsServiceImpl implements EventsService {
88 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
89 private static final String BATCH_LENGTH = "event.batch.length";
90 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
91 private static final String TIMEOUT_PROPERTY = "timeout";
92 private static final String SUBSCRIBE_ACTION = "sub";
93 private static final String PUBLISH_ACTION = "pub";
96 private DMaaPErrorMessages errorMessages;
98 String getPropertyFromAJSCmap(String propertyKey) {
99 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
102 public DMaaPErrorMessages getErrorMessages() {
103 return errorMessages;
106 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
107 this.errorMessages = errorMessages;
113 * @param consumerGroup
115 * @throws ConfigDbException,
116 * TopicExistsException, AccessDeniedException,
117 * UnavailableException, CambriaApiException, IOException
122 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
123 throws ConfigDbException, AccessDeniedException, UnavailableException,
124 CambriaApiException, IOException {
126 final long startTime = System.currentTimeMillis();
127 final HttpServletRequest req = ctx.getRequest();
128 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
129 final String remoteHost = req.getRemoteHost();
130 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
131 .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
133 validateIpBlacklist(errRespProvider, ctx);
135 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
136 if (metaTopic == null) {
137 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
140 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
142 final long elapsedMs1 = System.currentTimeMillis() - startTime;
143 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
147 final boolean pretty = isPrettyPrintEnabled();
148 final boolean withMeta = isMetaOffsetEnabled();
149 int timeoutMs = getMessageTimeout(req);
150 int limit = getMessageLimit(req);
151 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
152 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
154 Consumer consumer = null;
156 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
157 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
158 rl.onCall(topic, consumerGroup, clientId, remoteHost);
159 consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
161 CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
162 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
163 coes.setDmaapContext(ctx);
164 coes.setTopic(metaTopic);
165 coes.setTransEnabled(isTransEnabled() || isAAFTopic);
166 coes.setTopicStyle(isAAFTopic);
167 final long elapsedMs2 = System.currentTimeMillis() - startTime;
168 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
169 + consumerGroup + " " + clientId);
171 respondOkWithStream(ctx, coes);
172 // No IOException thrown during respondOkWithStream, so commit the
173 // new offsets to all the brokers
174 consumer.commitOffsets();
175 final int sent = coes.getSentCount();
176 metricsSet.consumeTick(sent);
177 rl.onSend(topic, consumerGroup, clientId, sent);
178 final long elapsedMs = System.currentTimeMillis() - startTime;
179 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
180 + topic + " " + consumerGroup + " " + clientId + " on to the server "
183 } catch (UnavailableException excp) {
184 logger.warn(excp.getMessage(), excp);
185 ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
186 LOG.info(errRes.toString());
187 throw new CambriaApiException(errRes);
189 } catch (ConcurrentModificationException excp1) {
190 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
191 ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
192 logger.info(errRes.toString());
193 throw new CambriaApiException(errRes);
195 } catch (Exception excp) {
196 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
197 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
198 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
199 ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
200 logger.info(errRes.toString());
201 throw new CambriaApiException(errRes);
203 if (consumer != null && !isCacheEnabled()) {
206 } catch (Exception e) {
207 logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
208 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
215 private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
216 final String remoteAddr = Utils.getRemoteAddress(ctx);
217 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
218 ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
219 LOG.info(errRes.toString());
220 throw new CambriaApiException(errRes);
224 private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
225 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
227 boolean isAAFTopic = false;
228 String metricTopicName = getMetricTopicName();
229 if(!metricTopicName.equalsIgnoreCase(topicName)) {
230 if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
232 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
233 String permission = aaf.aafPermissionString(topicName, action);
234 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
235 ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
236 LOG.info(errRes.toString());
237 throw new DMaaPAccessDeniedException(errRes);
240 } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
241 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
242 if(SUBSCRIBE_ACTION.equals(action)) {
243 metaTopic.checkUserRead(user);
244 } else if(PUBLISH_ACTION.equals(action)) {
245 metaTopic.checkUserWrite(user);
252 boolean isCadiEnabled() {
253 return Utils.isCadiEnabled();
256 void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
257 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
258 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
261 private int getMessageLimit(HttpServletRequest request) {
262 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
265 private int getMessageTimeout(HttpServletRequest request) {
266 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
267 int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
268 CambriaConstants.kNoTimeout;
270 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
271 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
274 private boolean isPrettyPrintEnabled() {
275 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
278 private boolean isMetaOffsetEnabled() {
279 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
282 private boolean isTopicNameEnforcedAaf(String topicName) {
283 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
284 return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
287 private boolean isCacheEnabled() {
288 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
289 return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
292 private void verifyHostId() {
293 String lhostId = getPropertyFromAJSCmap("clusterhostid");
294 if (lhostId.isEmpty()) {
296 InetAddress.getLocalHost().getCanonicalHostName();
297 } catch (UnknownHostException e) {
298 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
304 private String getMetricTopicName() {
305 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
306 return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
310 * @throws missingReqdSetting
314 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
315 final String requestTime) throws ConfigDbException, AccessDeniedException,
316 CambriaApiException, IOException, missingReqdSetting {
318 final long startMs = System.currentTimeMillis();
319 String remoteHost = ctx.getRequest().getRemoteHost();
320 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
321 .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
322 .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
324 validateIpBlacklist(errRespProvider, ctx);
326 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
327 if (metaTopic == null) {
328 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
331 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
333 final HttpServletRequest req = ctx.getRequest();
334 boolean chunked = isRequestedChunk(req);
335 String mediaType = getMediaType(req);
336 boolean transactionRequired = isTransactionIdRequired();
338 if (isAAFTopic || transactionRequired) {
339 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
341 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
344 final long endMs = System.currentTimeMillis();
345 final long totalMs = endMs - startMs;
346 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
349 private boolean isRequestedChunk(HttpServletRequest request) {
350 return null != request.getHeader(TRANSFER_ENCODING) &&
351 request.getHeader(TRANSFER_ENCODING).contains("chunked");
354 private String getMediaType(HttpServletRequest request) {
355 String mediaType = request.getContentType();
356 if (mediaType == null || mediaType.length() == 0) {
357 return MimeTypes.kAppGenericBinary;
359 return mediaType.replace("; charset=UTF-8", "").trim();
362 private boolean isTransactionIdRequired() {
363 return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
371 * @param defaultPartition
374 * @throws ConfigDbException
375 * @throws AccessDeniedException
376 * @throws TopicExistsException
377 * @throws CambriaApiException
378 * @throws IOException
380 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
382 throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
383 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
384 // setup the event set
385 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
387 // start processing, building a batch to push to the backend
388 final long startMs = System.currentTimeMillis();
390 long maxEventBatch = 1024L* 16;
391 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
392 if (null != batchlen && !batchlen.isEmpty())
393 maxEventBatch = Long.parseLong(batchlen);
394 // long maxEventBatch =
396 final LinkedList<Publisher.message> batch = new LinkedList<>();
397 // final ArrayList<KeyedMessage<String, String>> kms = new
399 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
401 // for each message...
402 Publisher.message m = null;
403 while ((m = events.next()) != null) {
404 // add the message to the batch
406 // final KeyedMessage<String, String> data = new
407 // KeyedMessage<String, String>(topic, m.getKey(),
410 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
414 // check if the batch is full
415 final int sizeNow = batch.size();
416 if (sizeNow > maxEventBatch) {
417 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
420 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
423 metricsSet.publishTick(sizeNow);
428 // send the pending batch
429 final int sizeNow = batch.size();
431 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
434 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
437 metricsSet.publishTick(sizeNow);
441 final long endMs = System.currentTimeMillis();
442 final long totalMs = endMs - startMs;
444 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
445 + ctx.getRequest().getRemoteHost());
448 final JSONObject response = new JSONObject();
449 response.put("count", count);
450 response.put("serverTimeMs", totalMs);
451 respondOk(ctx, response);
453 } catch (Exception excp) {
454 int status = HttpStatus.SC_NOT_FOUND;
455 String errorMsg = null;
456 if (excp instanceof CambriaApiException) {
457 status = ((CambriaApiException) excp).getStatus();
458 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
459 JSONObject errObject = new JSONObject(jsonTokener);
460 errorMsg = (String) errObject.get("message");
463 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
464 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
466 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
468 LOG.info(errRes.toString());
469 throw new CambriaApiException(errRes);
479 * @param partitionKey
483 * @throws ConfigDbException
484 * @throws AccessDeniedException
485 * @throws TopicExistsException
486 * @throws IOException
487 * @throws CambriaApiException
489 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
490 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
491 throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
493 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
495 // setup the event set
496 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
498 // start processing, building a batch to push to the backend
499 final long startMs = System.currentTimeMillis();
501 long maxEventBatch = 1024L * 16;
502 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
503 if (null != evenlen && !evenlen.isEmpty())
504 maxEventBatch = Long.parseLong(evenlen);
505 // final long maxEventBatch =
507 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
508 // final ArrayList<KeyedMessage<String, String>> kms = new
510 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
511 Publisher.message m = null;
512 int messageSequence = 1;
514 final boolean transactionEnabled = true;
515 int publishBatchCount = 0;
516 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
518 // LOG.warn("Batch Start Id: " +
521 // for each message...
522 batchId = DMaaPContext.getBatchID();
524 String responseTransactionId = null;
526 while ((m = events.next()) != null) {
528 // LOG.warn("Batch Start Id: " +
531 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
538 responseTransactionId = m.getLogDetails().getTransactionId();
540 //JSONObject jsonObject = new JSONObject();
541 //jsonObject.put("msgWrapMR", m.getMessage());
542 //jsonObject.put("transactionId", responseTransactionId);
543 // final KeyedMessage<String, String> data = new
544 // KeyedMessage<String, String>(topic, m.getKey(),
547 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
551 // check if the batch is full
552 final int sizeNow = batch.size();
553 if (sizeNow >= maxEventBatch) {
554 String startTime = sdf.format(new Date());
555 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
558 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
560 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
561 // transactionLogs(batch);
562 for (message msg : batch) {
563 LogDetails logDetails = msg.getLogDetails();
564 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
566 } catch (Exception excp) {
568 int status = HttpStatus.SC_NOT_FOUND;
569 String errorMsg = null;
570 if (excp instanceof CambriaApiException) {
571 status = ((CambriaApiException) excp).getStatus();
572 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
573 JSONObject errObject = new JSONObject(jsonTokener);
574 errorMsg = (String) errObject.get("message");
576 ErrorResponse errRes = new ErrorResponse(status,
577 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
578 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
579 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
580 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
581 ctx.getRequest().getRemoteHost(), null, null);
582 LOG.info(errRes.toString());
583 throw new CambriaApiException(errRes);
587 metricsSet.publishTick(sizeNow);
588 publishBatchCount = sizeNow;
591 String endTime = sdf.format(new Date());
592 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
593 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
594 + ",Batch End Time=" + endTime + "]");
595 batchId = DMaaPContext.getBatchID();
599 // send the pending batch
600 final int sizeNow = batch.size();
602 String startTime = sdf.format(new Date());
603 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
606 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
608 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
610 for (message msg : batch) {
611 LogDetails logDetails = msg.getLogDetails();
612 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
614 } catch (Exception excp) {
615 int status = HttpStatus.SC_NOT_FOUND;
616 String errorMsg = null;
617 if (excp instanceof CambriaApiException) {
618 status = ((CambriaApiException) excp).getStatus();
619 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
620 JSONObject errObject = new JSONObject(jsonTokener);
621 errorMsg = (String) errObject.get("message");
624 ErrorResponse errRes = new ErrorResponse(status,
625 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
626 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
627 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
628 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
629 ctx.getRequest().getRemoteHost(), null, null);
630 LOG.info(errRes.toString());
631 throw new CambriaApiException(errRes);
634 metricsSet.publishTick(sizeNow);
637 String endTime = sdf.format(new Date());
638 publishBatchCount = sizeNow;
639 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
640 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
644 final long endMs = System.currentTimeMillis();
645 final long totalMs = endMs - startMs;
647 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
649 if (null != responseTransactionId) {
650 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
654 final JSONObject response = new JSONObject();
655 response.put("count", count);
656 response.put("transactionId", responseTransactionId);
657 response.put("serverTimeMs", totalMs);
658 respondOk(ctx, response);
660 } catch (Exception excp) {
661 int status = HttpStatus.SC_NOT_FOUND;
662 String errorMsg = null;
663 if (excp instanceof CambriaApiException) {
664 status = ((CambriaApiException) excp).getStatus();
665 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
666 JSONObject errObject = new JSONObject(jsonTokener);
667 errorMsg = (String) errObject.get("message");
670 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
671 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
672 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
673 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
674 ctx.getRequest().getRemoteHost(), null, null);
675 LOG.info(errRes.toString());
676 throw new CambriaApiException(errRes);
685 * @param messageCreationTime
686 * @param messageSequence
688 * @param transactionEnabled
690 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
691 final String messageCreationTime, final int messageSequence, final Long batchId,
692 final boolean transactionEnabled) {
693 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
695 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
696 msg.setTransactionEnabled(transactionEnabled);
697 msg.setLogDetails(logDetails);
700 void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
701 DMaaPResponseBuilder.respondOk(ctx, response);
706 * @author anowarul.islam
709 private static class LogWrap {
710 private final String fId;
713 * constructor initialization
719 public LogWrap(String topic, String cgroup, String cid) {
720 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
727 public void info(String msg) {
736 public void warn(String msg, Exception t) {
737 LOG.warn(fId + msg, t);
742 public boolean isTransEnabled() {
743 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
744 boolean istransidreqd = false;
745 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
746 istransidreqd = true;
749 return istransidreqd;
753 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
754 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
755 LogDetails logDetails = new LogDetails();
756 logDetails.setTopicId(topicName);
757 logDetails.setMessageTimestamp(messageTimestamp);
758 logDetails.setPublisherId(Utils.getUserApiKey(request));
759 logDetails.setPublisherIp(request.getRemoteHost());
760 logDetails.setMessageBatchId(batchId);
761 logDetails.setMessageSequence(String.valueOf(messageSequence));
762 logDetails.setTransactionEnabled(transactionEnabled);
763 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
764 logDetails.setServerIp(request.getLocalAddr());