public static Integer getRequestTimeOut() {
return requestTimeOut;
}
+
+ public CrudAsyncGraphDataService(GraphDao dao,
+ EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException {
+ this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
+ }
public CrudAsyncGraphDataService(GraphDao dao,
+ GraphDao daoForGet,
EventPublisher asyncRequestPublisher,
EventConsumer asyncResponseConsumer) throws CrudException {
- super(dao);
+ super();
+ this.dao = dao;
+ this.daoForGet = daoForGet;
requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
try {
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version,
response.getVertex().toVertex()), version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
public String patchVertex(String version, String id, String type, VertexPayload payload)
throws CrudException {
Vertex existingVertex
- = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
+ = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version);
Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
type, payload.getProperties(),
existingVertex);
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return "";
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return "";
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version,
response.getEdge().toEdge()), version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
timer.cancel();
}
-
+
@Override
- public String addBulk(String version, BulkPayload payload) throws CrudException {
- throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);
+ protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
+ }
+
+ @Override
+ protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
+ }
+
+ @Override
+ protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+
+ @Override
+ protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+
+ private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logSuccessResponse(event, response);
+ } else {
+ logErrorResponse(event, response);
+ throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ return response;
}
-
+ private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ }
+
+ private void logErrorResponse(GraphEvent event, GraphEvent response) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ }
}
\ No newline at end of file