- private static Integer requestTimeOut;
-
- private EventPublisher asyncRequestPublisher;
-
- private Timer timer;
-
- public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
- private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
-
- private static Logger logger = LoggerFactory.getInstance()
- .getLogger(CrudAsyncGraphDataService.class.getName());
- private static Logger metricsLogger = LoggerFactory.getInstance()
- .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
- private static LogFields OK_FIELDS = new LogFields();
-
- static {
- OK_FIELDS.setField(Status.OK, Status.OK.toString());
- }
-
- public static Integer getRequestTimeOut() {
- return requestTimeOut;
- }
-
- public CrudAsyncGraphDataService(GraphDao dao,
- EventPublisher asyncRequestPublisher,
- EventConsumer asyncResponseConsumer) throws CrudException {
- this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
- }
-
- public CrudAsyncGraphDataService(GraphDao dao,
- GraphDao daoForGet,
- EventPublisher asyncRequestPublisher,
- EventConsumer asyncResponseConsumer) throws CrudException {
-
- super();
- this.dao = dao;
- this.daoForGet = daoForGet;
-
- requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
- try {
- requestTimeOut
- = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
- } catch (NumberFormatException ex) {
- // Leave it as the default
+ private static Integer requestTimeOut;
+
+ private EventPublisher asyncRequestPublisher;
+
+ private Timer timer;
+
+ public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
+ private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
+ private static Logger metricsLogger =
+ LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
+ private static LogFields okFields = new LogFields();
+ private EtagGenerator etagGenerator;
+
+ static {
+ okFields.setField(Status.OK, Status.OK.toString());
+ }
+
+ private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
+
+ public static Integer getRequestTimeOut() {
+ return requestTimeOut;
+ }
+
+ public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
+ this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
+ }
+
+ public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
+
+ super();
+ this.dao = dao;
+ this.daoForGet = daoForGet;
+
+ requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
+ try {
+ requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
+ } catch (NumberFormatException ex) {
+ // Leave it as the default
+ }
+
+ Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
+ try {
+ responsePollInterval =
+ Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
+ } catch (Exception ex) {
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
+ + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
+ }
+
+ // Start the Response Consumer timer
+ CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
+ asyncResponseConsumer, new GraphEventUpdater()
+ );
+ timer = new Timer("crudAsyncResponseConsumer-1");
+ timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
+
+ this.asyncRequestPublisher = asyncRequestPublisher;
+ this.etagGenerator = new EtagGenerator();
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
+ }
+
+ public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
+ private volatile GraphEventEnvelope graphEventEnvelope;
+ private volatile CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public GraphEventEnvelope call() throws TimeoutException {
+ try {
+ // Wait until graphEvent is available
+ latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ latch.countDown();
+ if (this.graphEventEnvelope != null) {
+ return this.graphEventEnvelope;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ return this.graphEventEnvelope;
+ }
+
+ public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
+ this.graphEventEnvelope = eventEnvelope;
+ latch.countDown();
+ }
+ }
+
+ private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
+
+ long startTimeInMs = System.currentTimeMillis();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ MdcOverride override = new MdcOverride();
+ override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
+
+ String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
+
+ // publish to request queue
+ try {
+ asyncRequestPublisher.sendSync(eventEnvelopeJson);
+ } catch (Exception e) {
+ throw new CrudException(
+ "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
+ Status.INTERNAL_SERVER_ERROR);
+ }
+
+ logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString());
+
+ ExecutorService executor =
+ Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
+ CollectGraphResponse collector = new CollectGraphResponse();
+ CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
+ GraphEventEnvelope response;
+ Future<GraphEventEnvelope> future = executor.submit(collector);
+ try {
+ response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
+ "Request timed out for transactionId: " + event.getTransactionId());
+ future.cancel(true);
+ throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ // Kill the thread as the work is completed
+ executor.shutdownNow();
+ }
+ metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
+ "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
+ + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
+ + " ms");
+ return response;