/**
* Logger obj
*/
- //private Logger log = Logger.getLogger(EventsRestService.class.toString());
- private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
+ private static final EELFLogger log = EELFManager.getLogger(EventsRestService.class);
/**
* HttpServletRequest obj
*/
public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
String consumergroup,
@PathParam("consumerid") String consumerid) throws CambriaApiException {
- // log.info("Consuming message from topic " + topic );
+ log.info("Consuming message from topic " + topic );
DMaaPContext dMaaPContext = getDmaapContext();
dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
try {
-
eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
- catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
-
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
- }
-
- catch (ConfigDbException | UnavailableException | IOException e) {
+ } catch (ConfigDbException | UnavailableException | IOException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
request.getRemoteHost());
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}
try {
throw new TopicExistsException("Incorrect URL");
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
-
}
/**
@GET
@Path("/{topic}/{consumergroup}")
public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
- String consumergroup
- ) throws CambriaApiException {
+ String consumergroup) throws CambriaApiException {
// log.info("Consuming message from topic " + topic );
DMaaPContext dMaaPContext = getDmaapContext();
dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
-
try {
-
throw new TopicExistsException("Incorrect URL");
- }
- catch (TopicExistsException e) {
+ } catch (TopicExistsException e) {
log.error("Error while reading data from topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
-
}
-
-
-
-
-
-
/**
* This method is used to publish messages.Taking two parameter topic and
public void pushEvents(@PathParam("topic") String topic, InputStream msg,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
log.info("Publishing message to topic " + topic);
-
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
- }
- catch ( TopicExistsException e) {
+ } catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
+ } catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
-
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+ } catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
@Path("/transaction/{topic}")
public void pushEventsWithTransaction(@PathParam("topic") String topic,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
- // log.info("Publishing message with transaction id for topic " + topic
- // );
-
-
try {
-
eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
partitionKey,
Utils.getFormattedDate(new Date()));
- }
-
- catch ( TopicExistsException e) {
+ } catch ( TopicExistsException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
- catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
+ } catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
log.error("Error while publishing to topic [" + topic + "].", e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
- }
-
- catch (ConfigDbException | IOException | missingReqdSetting e) {
+ } catch (ConfigDbException | IOException | missingReqdSetting e) {
log.error("Error while publishing to topic : " + topic, e);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
null, null);
log.info(errRes.toString());
throw new CambriaApiException(errRes);
-
}
}