+ String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
+
+ // publish to request queue
+ try {
+ asyncRequestPublisher.sendSync(eventEnvelopeJson);
+ } catch (Exception e) {
+ throw new CrudException(
+ "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
+ Status.INTERNAL_SERVER_ERROR);
+ }
+
+ logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString());
+
+ ExecutorService executor =
+ Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
+ CollectGraphResponse collector = new CollectGraphResponse();
+ CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
+ GraphEventEnvelope response;
+ Future<GraphEventEnvelope> future = executor.submit(collector);
+ try {
+ response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
+ "Request timed out for transactionId: " + event.getTransactionId());
+ future.cancel(true);
+ throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ // Kill the thread as the work is completed
+ executor.shutdownNow();
+ }
+ metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
+ "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
+ + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
+ + " ms");
+ return response;
+ }
+
+ @Override
+ public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
+ throws CrudException {
+ // Validate the incoming payload
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
+ vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
+ // Create graph request event
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleVertexResponse(version, event, response);
+
+ return new ImmutablePair<EntityTag, String>(entityTag, responsePayload);
+ }
+
+ @Override
+ public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
+ throws CrudException {
+ Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
+
+ // Create graph request event
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleEdgeResponse(version, event, response);