Change-Id: I309010480012cc66ac8d44d6be65f5897174ec3f
Issue-ID: AAI-1430
Signed-off-by: bogumil_zebek <bogumil.zebek@nokia.com>
response = Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
LoggingUtil.logInternalError(logger, e);
} finally {
response = Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
LoggingUtil.logInternalError(logger, e);
} finally {
- logger.debug(response.getEntity().toString());
+ if (response != null) {
+ logger.debug(response.getEntity().toString());
+ }
LoggingUtil.logRestRequest(logger, auditLogger, req, response);
metricsLogger.info(ChampMsgs.PROCESSED_REQUEST, "GET", Long.toString(System.currentTimeMillis() - startTimeInMs));
}
LoggingUtil.logRestRequest(logger, auditLogger, req, response);
metricsLogger.info(ChampMsgs.PROCESSED_REQUEST, "GET", Long.toString(System.currentTimeMillis() - startTimeInMs));
}
if (asyncRequestConsumer == null) {
logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Unable to initialize ChampAsyncRequestProcessor");
if (asyncRequestConsumer == null) {
logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Unable to initialize ChampAsyncRequestProcessor");
+ } else {
+ try {
+ Iterable<String> events = asyncRequestConsumer.consume();
+
+ if (events == null || !events.iterator().hasNext()) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
+ } else {
+ processEvents(events);
+ }
- Iterable<String> events = null;
- try {
- events = asyncRequestConsumer.consume();
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- return;
+ asyncRequestConsumer.commitOffsets();
+ } catch (OperationNotSupportedException e) {
+ // Dmaap doesnt support commit with offset
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ }
- if (events == null || !events.iterator().hasNext()) {
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
- } else {
- for (String event : events) {
- try {
- GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
- GraphEvent requestEvent = requestEnvelope.getBody();
- auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ private void processEvents(Iterable<String> events) {
+ for (String event : events) {
+ try {
+ GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
+ GraphEvent requestEvent = requestEnvelope.getBody();
+ auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event received of type: " + requestEvent.getObjectType() + " with key: "
"Event received of type: " + requestEvent.getObjectType() + " with key: "
- + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
- + " , operation: " + requestEvent.getOperation().toString());
- logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
"Event received of type: " + requestEvent.getObjectType() + " with key: "
"Event received of type: " + requestEvent.getObjectType() + " with key: "
- + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
- + " , operation: " + requestEvent.getOperation().toString());
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
+ + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
+ + " , operation: " + requestEvent.getOperation().toString());
+ logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
- // Try to submit the event to be published to the event bus.
- if (!requestProcesserEventQueue.offer(requestEnvelope)) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
+ // Try to submit the event to be published to the event bus.
+ if (!requestProcesserEventQueue.offer(requestEnvelope)) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
"Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
"Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
- }
-
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
- try {
- asyncRequestConsumer.commitOffsets();
- } catch (OperationNotSupportedException e) {
- // Dmaap doesnt support commit with offset
- logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
- } catch (Exception e) {
- logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
+ } catch (Exception e) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
+ }
}
public Integer getRequestPollingTimeSeconds() {
}
public Integer getRequestPollingTimeSeconds() {