@Override
public void run() {
- while (true) {
- GraphEventEnvelope eventEnvelope = null;
- GraphEvent event = null;
- try {
- // Get the next event to be published from the queue.
- eventEnvelope = responsePublisherEventQueue.take();
- event = eventEnvelope.getBody();
- } catch (InterruptedException e) {
- // Restore the interrupted status.
- Thread.currentThread().interrupt();
- }
- // Publish the response
- try {
- event.setTimestamp(System.currentTimeMillis());
- asyncResponsePublisher.sendSync(eventEnvelope.toJson());
- if (event.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
- "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult());
- } else {
- logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
- "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
- + event.getErrorMessage());
- }
- } catch (Exception ex) {
- logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
- }
-
+ while (true) {
+ try {
+ // Get the next event to be published from the queue.
+ GraphEventEnvelope eventEnvelope = responsePublisherEventQueue.take();
+ GraphEvent event = eventEnvelope.getBody();
+
+ // Publish the response
+ try {
+ event.setTimestamp(System.currentTimeMillis());
+ asyncResponsePublisher.sendSync(eventEnvelope.toJson());
+ if (event.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
+ "Response published for Event of type: " + event.getObjectType() + " with key: " + event
+ .getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult());
+ } else {
+ logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
+ "Response published for Event of type: " + event.getObjectType() + " with key: " + event
+ .getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
+ + event.getErrorMessage());
+ }
+ } catch (Exception ex) {
+ logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
}
}
}