1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import com.att.nsa.configs.ConfigDbException;
28 import com.att.nsa.drumlin.service.standards.MimeTypes;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import com.att.nsa.util.rrConvertor;
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.net.InetAddress;
36 import java.net.UnknownHostException;
37 import java.text.SimpleDateFormat;
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import javax.servlet.http.HttpServletRequest;
43 import javax.ws.rs.core.MediaType;
44 import org.apache.commons.lang.StringUtils;
45 import org.apache.commons.lang.math.NumberUtils;
46 import org.apache.http.HttpStatus;
47 import org.apache.kafka.clients.producer.ProducerRecord;
48 import org.apache.kafka.common.errors.TopicExistsException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.onap.dmaap.dmf.mr.CambriaApiException;
52 import org.onap.dmaap.dmf.mr.backends.Consumer;
53 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
54 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
55 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
56 import org.onap.dmaap.dmf.mr.backends.Publisher;
57 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
58 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
59 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
60 import org.onap.dmaap.dmf.mr.beans.LogDetails;
61 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
62 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
63 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
64 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
65 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
66 import org.onap.dmaap.dmf.mr.metabroker.Topic;
67 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
68 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
69 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
70 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
72 import org.onap.dmaap.dmf.mr.service.EventsService;
73 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
74 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
75 import org.onap.dmaap.dmf.mr.utils.Utils;
76 import org.springframework.beans.factory.annotation.Autowired;
77 import org.springframework.stereotype.Service;
80 * This class provides the functinality to publish and subscribe message to
83 * @author Ramkumar Sembaiyam
87 public class EventsServiceImpl implements EventsService {
89 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
90 private static final String BATCH_LENGTH = "event.batch.length";
91 private static final String TRANSFER_ENCODING = "Transfer-Encoding";
92 private static final String TIMEOUT_PROPERTY = "timeout";
93 private static final String SUBSCRIBE_ACTION = "sub";
94 private static final String PUBLISH_ACTION = "pub";
97 private DMaaPErrorMessages errorMessages;
99 String getPropertyFromAJSCmap(String propertyKey) {
100 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
103 public DMaaPErrorMessages getErrorMessages() {
104 return errorMessages;
107 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
108 this.errorMessages = errorMessages;
114 * @param consumerGroup
116 * @throws ConfigDbException,
117 * TopicExistsException, AccessDeniedException,
118 * UnavailableException, CambriaApiException, IOException
123 public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
124 throws ConfigDbException, AccessDeniedException, UnavailableException,
125 CambriaApiException, IOException {
127 final long startTime = System.currentTimeMillis();
128 final HttpServletRequest req = ctx.getRequest();
129 final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
130 final String remoteHost = req.getRemoteHost();
131 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
132 .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
134 validateIpBlacklist(errRespProvider, ctx);
136 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
137 if (metaTopic == null) {
138 throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
141 boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
143 final long elapsedMs1 = System.currentTimeMillis() - startTime;
144 logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
148 final boolean pretty = isPrettyPrintEnabled();
149 final boolean withMeta = isMetaOffsetEnabled();
150 int timeoutMs = getMessageTimeout(req);
151 int limit = getMessageLimit(req);
152 String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
153 logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
155 Consumer consumer = null;
157 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
158 final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
159 rl.onCall(topic, consumerGroup, clientId, remoteHost);
160 consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
162 CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
163 .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
164 coes.setDmaapContext(ctx);
165 coes.setTopic(metaTopic);
166 coes.setTransEnabled(isTransEnabled() || isAAFTopic);
167 coes.setTopicStyle(isAAFTopic);
168 final long elapsedMs2 = System.currentTimeMillis() - startTime;
169 logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
170 + consumerGroup + " " + clientId);
172 respondOkWithStream(ctx, coes);
173 // No IOException thrown during respondOkWithStream, so commit the
174 // new offsets to all the brokers
175 consumer.commitOffsets();
176 final int sent = coes.getSentCount();
177 metricsSet.consumeTick(sent);
178 rl.onSend(topic, consumerGroup, clientId, sent);
179 final long elapsedMs = System.currentTimeMillis() - startTime;
180 logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
181 + topic + " " + consumerGroup + " " + clientId + " on to the server "
184 } catch (UnavailableException excp) {
185 logger.warn(excp.getMessage(), excp);
186 ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
187 LOG.info(errRes.toString());
188 throw new CambriaApiException(errRes);
190 } catch (ConcurrentModificationException excp1) {
191 LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
192 ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
193 logger.info(errRes.toString());
194 throw new CambriaApiException(errRes);
196 } catch (Exception excp) {
197 logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
198 + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
199 ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
200 ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
201 logger.info(errRes.toString());
202 throw new CambriaApiException(errRes);
204 if (consumer != null && !isCacheEnabled()) {
207 } catch (Exception e) {
208 logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
209 + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
216 private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
217 final String remoteAddr = Utils.getRemoteAddress(ctx);
218 if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
219 ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
220 LOG.info(errRes.toString());
221 throw new CambriaApiException(errRes);
225 private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
226 ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
228 boolean isAAFTopic = false;
229 String metricTopicName = getMetricTopicName();
230 if(!metricTopicName.equalsIgnoreCase(topicName)) {
231 if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
233 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
234 String permission = aaf.aafPermissionString(topicName, action);
235 if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
236 ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
237 LOG.info(errRes.toString());
238 throw new DMaaPAccessDeniedException(errRes);
241 } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
242 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
243 if(SUBSCRIBE_ACTION.equals(action)) {
244 metaTopic.checkUserRead(user);
245 } else if(PUBLISH_ACTION.equals(action)) {
246 metaTopic.checkUserWrite(user);
253 boolean isCadiEnabled() {
254 return Utils.isCadiEnabled();
257 void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
258 DMaaPResponseBuilder.setNoCacheHeadings(ctx);
259 DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
262 private int getMessageLimit(HttpServletRequest request) {
263 return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
266 private int getMessageTimeout(HttpServletRequest request) {
267 String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
268 int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
269 CambriaConstants.kNoTimeout;
271 String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
272 return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
275 private boolean isPrettyPrintEnabled() {
276 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
279 private boolean isMetaOffsetEnabled() {
280 return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
283 private boolean isTopicNameEnforcedAaf(String topicName) {
284 String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
285 return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd);
288 private boolean isCacheEnabled() {
289 String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
290 return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
293 private void verifyHostId() {
294 String lhostId = getPropertyFromAJSCmap("clusterhostid");
295 if (StringUtils.isEmpty(lhostId)) {
297 InetAddress.getLocalHost().getCanonicalHostName();
298 } catch (UnknownHostException e) {
299 LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
305 private String getMetricTopicName() {
306 String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
307 return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
311 * @throws missingReqdSetting
315 public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
316 final String requestTime) throws ConfigDbException, AccessDeniedException,
317 CambriaApiException, IOException, missingReqdSetting {
319 final long startMs = System.currentTimeMillis();
320 String remoteHost = ctx.getRequest().getRemoteHost();
321 ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
322 .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
323 .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
325 validateIpBlacklist(errRespProvider, ctx);
327 final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
330 final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
332 final HttpServletRequest req = ctx.getRequest();
333 boolean chunked = isRequestedChunk(req);
334 String mediaType = getMediaType(req);
335 boolean transactionRequired = isTransactionIdRequired();
337 if (isAAFTopic || transactionRequired) {
338 pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
340 pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
343 final long endMs = System.currentTimeMillis();
344 final long totalMs = endMs - startMs;
345 LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
348 private boolean isRequestedChunk(HttpServletRequest request) {
349 return null != request.getHeader(TRANSFER_ENCODING) &&
350 request.getHeader(TRANSFER_ENCODING).contains("chunked");
353 private String getMediaType(HttpServletRequest request) {
354 String mediaType = request.getContentType();
355 if (mediaType == null || mediaType.length() == 0) {
356 return MimeTypes.kAppGenericBinary;
358 return mediaType.replace("; charset=UTF-8", "").trim();
361 private boolean isTransactionIdRequired() {
362 String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd");
363 return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true");
371 * @param defaultPartition
374 * @throws ConfigDbException
375 * @throws AccessDeniedException
376 * @throws TopicExistsException
377 * @throws CambriaApiException
378 * @throws IOException
380 private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
382 throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
383 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
384 // setup the event set
385 final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
387 // start processing, building a batch to push to the backend
388 final long startMs = System.currentTimeMillis();
390 long maxEventBatch = 1024L* 16;
391 String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
392 if (null != batchlen && !batchlen.isEmpty())
393 maxEventBatch = Long.parseLong(batchlen);
394 // long maxEventBatch =
396 final LinkedList<Publisher.message> batch = new LinkedList<>();
397 // final ArrayList<KeyedMessage<String, String>> kms = new
399 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
401 // for each message...
402 Publisher.message m = null;
403 while ((m = events.next()) != null) {
404 // add the message to the batch
406 // final KeyedMessage<String, String> data = new
407 // KeyedMessage<String, String>(topic, m.getKey(),
410 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
414 // check if the batch is full
415 final int sizeNow = batch.size();
416 if (sizeNow > maxEventBatch) {
417 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
420 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
423 metricsSet.publishTick(sizeNow);
428 // send the pending batch
429 final int sizeNow = batch.size();
431 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
434 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
437 metricsSet.publishTick(sizeNow);
441 final long endMs = System.currentTimeMillis();
442 final long totalMs = endMs - startMs;
444 LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
445 + ctx.getRequest().getRemoteHost());
448 final JSONObject response = new JSONObject();
449 response.put("count", count);
450 response.put("serverTimeMs", totalMs);
451 respondOk(ctx, response);
453 } catch (Exception excp) {
454 int status = HttpStatus.SC_NOT_FOUND;
455 String errorMsg = null;
456 if (excp instanceof CambriaApiException) {
457 status = ((CambriaApiException) excp).getStatus();
458 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
459 JSONObject errObject = new JSONObject(jsonTokener);
460 errorMsg = (String) errObject.get("message");
463 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
464 errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
466 null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
468 LOG.info(errRes.toString());
469 throw new CambriaApiException(errRes);
479 * @param partitionKey
483 * @throws ConfigDbException
484 * @throws AccessDeniedException
485 * @throws TopicExistsException
486 * @throws IOException
487 * @throws CambriaApiException
489 private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
490 final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
491 throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
493 final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
495 // setup the event set
496 final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
498 // start processing, building a batch to push to the backend
499 final long startMs = System.currentTimeMillis();
501 long maxEventBatch = 1024L * 16;
502 String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
503 if (null != evenlen && !evenlen.isEmpty())
504 maxEventBatch = Long.parseLong(evenlen);
505 // final long maxEventBatch =
507 final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
508 // final ArrayList<KeyedMessage<String, String>> kms = new
510 final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
511 Publisher.message m = null;
512 int messageSequence = 1;
514 final boolean transactionEnabled = true;
515 int publishBatchCount = 0;
516 SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
518 // LOG.warn("Batch Start Id: " +
521 // for each message...
522 batchId = DMaaPContext.getBatchID();
524 String responseTransactionId = null;
526 while ((m = events.next()) != null) {
528 // LOG.warn("Batch Start Id: " +
531 addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
538 responseTransactionId = m.getLogDetails().getTransactionId();
540 //JSONObject jsonObject = new JSONObject();
541 //jsonObject.put("msgWrapMR", m.getMessage());
542 //jsonObject.put("transactionId", responseTransactionId);
543 // final KeyedMessage<String, String> data = new
544 // KeyedMessage<String, String>(topic, m.getKey(),
547 final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
551 // check if the batch is full
552 final int sizeNow = batch.size();
553 if (sizeNow >= maxEventBatch) {
554 String startTime = sdf.format(new Date());
555 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
558 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
560 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
561 // transactionLogs(batch);
562 for (message msg : batch) {
563 LogDetails logDetails = msg.getLogDetails();
564 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
566 } catch (Exception excp) {
568 int status = HttpStatus.SC_NOT_FOUND;
569 String errorMsg = null;
570 if (excp instanceof CambriaApiException) {
571 status = ((CambriaApiException) excp).getStatus();
572 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
573 JSONObject errObject = new JSONObject(jsonTokener);
574 errorMsg = (String) errObject.get("message");
576 ErrorResponse errRes = new ErrorResponse(status,
577 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
578 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
579 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
580 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
581 ctx.getRequest().getRemoteHost(), null, null);
582 LOG.info(errRes.toString());
583 throw new CambriaApiException(errRes);
587 metricsSet.publishTick(sizeNow);
588 publishBatchCount = sizeNow;
591 String endTime = sdf.format(new Date());
592 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
593 + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
594 + ",Batch End Time=" + endTime + "]");
595 batchId = DMaaPContext.getBatchID();
599 // send the pending batch
600 final int sizeNow = batch.size();
602 String startTime = sdf.format(new Date());
603 LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
606 // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
608 ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
610 for (message msg : batch) {
611 LogDetails logDetails = msg.getLogDetails();
612 LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
614 } catch (Exception excp) {
615 int status = HttpStatus.SC_NOT_FOUND;
616 String errorMsg = null;
617 if (excp instanceof CambriaApiException) {
618 status = ((CambriaApiException) excp).getStatus();
619 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
620 JSONObject errObject = new JSONObject(jsonTokener);
621 errorMsg = (String) errObject.get("message");
624 ErrorResponse errRes = new ErrorResponse(status,
625 DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
626 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
627 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
628 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
629 ctx.getRequest().getRemoteHost(), null, null);
630 LOG.info(errRes.toString());
631 throw new CambriaApiException(errRes);
634 metricsSet.publishTick(sizeNow);
637 String endTime = sdf.format(new Date());
638 publishBatchCount = sizeNow;
639 LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
640 + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
644 final long endMs = System.currentTimeMillis();
645 final long totalMs = endMs - startMs;
647 LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
649 if (null != responseTransactionId) {
650 ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
654 final JSONObject response = new JSONObject();
655 response.put("count", count);
656 response.put("transactionId", responseTransactionId);
657 response.put("serverTimeMs", totalMs);
658 respondOk(ctx, response);
660 } catch (Exception excp) {
661 int status = HttpStatus.SC_NOT_FOUND;
662 String errorMsg = null;
663 if (excp instanceof CambriaApiException) {
664 status = ((CambriaApiException) excp).getStatus();
665 JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
666 JSONObject errObject = new JSONObject(jsonTokener);
667 errorMsg = (String) errObject.get("message");
670 ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
671 "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
672 + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
673 null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
674 ctx.getRequest().getRemoteHost(), null, null);
675 LOG.info(errRes.toString());
676 throw new CambriaApiException(errRes);
685 * @param messageCreationTime
686 * @param messageSequence
688 * @param transactionEnabled
690 private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
691 final String messageCreationTime, final int messageSequence, final Long batchId,
692 final boolean transactionEnabled) {
693 LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
695 logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
696 msg.setTransactionEnabled(transactionEnabled);
697 msg.setLogDetails(logDetails);
700 void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
701 DMaaPResponseBuilder.respondOk(ctx, response);
706 * @author anowarul.islam
709 private static class LogWrap {
710 private final String fId;
713 * constructor initialization
719 public LogWrap(String topic, String cgroup, String cid) {
720 fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
727 public void info(String msg) {
736 public void warn(String msg, Exception t) {
737 LOG.warn(fId + msg, t);
742 public boolean isTransEnabled() {
743 String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
744 boolean istransidreqd = false;
745 if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
746 istransidreqd = true;
749 return istransidreqd;
753 private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
754 final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
755 LogDetails logDetails = new LogDetails();
756 logDetails.setTopicId(topicName);
757 logDetails.setMessageTimestamp(messageTimestamp);
758 logDetails.setPublisherId(Utils.getUserApiKey(request));
759 logDetails.setPublisherIp(request.getRemoteHost());
760 logDetails.setMessageBatchId(batchId);
761 logDetails.setMessageSequence(String.valueOf(messageSequence));
762 logDetails.setTransactionEnabled(transactionEnabled);
763 logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
764 logDetails.setServerIp(request.getLocalAddr());