*/
package org.onap.crud.service;
-import com.att.ecomp.event.api.EventConsumer;
-import com.att.ecomp.event.api.EventPublisher;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
import org.onap.aai.cl.api.LogFields;
import org.onap.aai.cl.api.Logger;
import org.onap.crud.parser.CrudResponseBuilder;
import org.onap.crud.util.CrudProperties;
import org.onap.crud.util.CrudServiceConstants;
-import org.onap.crud.util.CrudServiceUtil;
import org.onap.schema.OxmModelValidator;
import org.onap.schema.RelationshipSchemaValidator;
import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
+import java.util.HashMap;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PreDestroy;
import javax.ws.rs.core.Response.Status;
-public class CrudAsyncGraphDataService {
+public class CrudAsyncGraphDataService extends AbstractGraphDataService {
private static Integer requestTimeOut;
- private GraphDao dao;
-
private EventPublisher asyncRequestPublisher;
private Timer timer;
public static Integer getRequestTimeOut() {
return requestTimeOut;
}
+
+ public CrudAsyncGraphDataService(GraphDao dao,
+ EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException {
+ this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
+ }
public CrudAsyncGraphDataService(GraphDao dao,
+ GraphDao daoForGet,
EventPublisher asyncRequestPublisher,
EventConsumer asyncResponseConsumer) throws CrudException {
+ super();
+ this.dao = dao;
+ this.daoForGet = daoForGet;
+
requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
try {
requestTimeOut
+ " error: " + ex.getMessage());
}
- this.dao = dao;
-
// Start the Response Consumer timer
CrudAsyncResponseConsumer crudAsyncResponseConsumer
= new CrudAsyncResponseConsumer(asyncResponseConsumer);
timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
this.asyncRequestPublisher = asyncRequestPublisher;
-
- // load the schemas
- CrudServiceUtil.loadModels();
+
logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
"CrudAsyncGraphDataService initialized SUCCESSFULLY!");
}
}
}
- private GraphEvent sendAndWait(GraphEvent event) throws Exception {
+ private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
long startTimeInMs = System.currentTimeMillis();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
// publish to request queue
- asyncRequestPublisher.sendSync(event.toJson());
+ try {
+ asyncRequestPublisher.sendSync(event.toJson());
+ } catch (Exception e) {
+ throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
+ }
+
+ logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
"Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
return response;
}
- public String addVertex(String version, String type, VertexPayload payload) throws Exception {
+ public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
// Validate the incoming payload
Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
type, payload.getProperties());
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version,
response.getVertex().toVertex()), version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
- public String addEdge(String version, String type, EdgePayload payload) throws Exception {
+ public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
// Create graph request event
GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
public String updateVertex(String version, String id, String type, VertexPayload payload)
- throws Exception {
+ throws CrudException {
Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
type, payload.getProperties());
GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
public String patchVertex(String version, String id, String type, VertexPayload payload)
- throws Exception {
+ throws CrudException {
Vertex existingVertex
- = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
+ = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
type, payload.getProperties(),
existingVertex);
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertVertexResponse(
OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
- public String deleteVertex(String version, String id, String type) throws Exception {
+ public String deleteVertex(String version, String id, String type) throws CrudException {
type = OxmModelValidator.resolveCollectionType(version, type);
GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
.vertex(new GraphEventVertex(id, version, type, null)).build();
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return "";
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
- public String deleteEdge(String version, String id, String type) throws Exception {
+ public String deleteEdge(String version, String id, String type) throws CrudException {
RelationshipSchemaValidator.validateType(version, type);
GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
.edge(new GraphEventEdge(id, version, type, null, null, null)).build();
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return "";
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
public String updateEdge(String version, String id, String type, EdgePayload payload)
- throws Exception {
- Edge edge = dao.getEdge(id, type);
+ throws CrudException {
+ Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
payload);
GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version,
response.getEdge().toEdge()), version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
public String patchEdge(String version, String id, String type, EdgePayload payload)
- throws Exception {
- Edge edge = dao.getEdge(id, type);
+ throws CrudException {
+ Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
payload);
GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
GraphEvent response = sendAndWait(event);
if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
+ logSuccessResponse(event, response);
return CrudResponseBuilder.buildUpsertEdgeResponse(
RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
version);
} else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
+ logErrorResponse(event, response);
throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
}
}
- public String getEdge(String version, String id, String type) throws CrudException {
- RelationshipSchemaValidator.validateType(version, type);
- Edge edge = dao.getEdge(id, type);
+ @PreDestroy
+ protected void preShutdown() {
+ timer.cancel();
- return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
- .validateOutgoingPayload(version, edge),
- version);
}
-
- public String getEdges(String version, String type, Map<String, String> filter)
- throws CrudException {
- RelationshipSchemaValidator.validateType(version, type);
- List<Edge> items = dao.getEdges(type,
- RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
- return CrudResponseBuilder.buildGetEdgesResponse(items, version);
+
+ @Override
+ protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
}
-
- public String getVertex(String version, String id, String type) throws CrudException {
- type = OxmModelValidator.resolveCollectionType(version, type);
- Vertex vertex = dao.getVertex(id, type);
- List<Edge> edges = dao.getVertexEdges(id);
- return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
- .validateOutgoingPayload(version, vertex), edges,
- version);
+
+ @Override
+ protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
}
-
- public String getVertices(String version, String type, Map<String, String> filter)
- throws CrudException {
- type = OxmModelValidator.resolveCollectionType(version, type);
- List<Vertex> items = dao.getVertices(type,
- OxmModelValidator.resolveCollectionfilter(version, type, filter));
- return CrudResponseBuilder.buildGetVerticesResponse(items, version);
+
+ @Override
+ protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
}
-
- @PreDestroy
- protected void preShutdown() {
- timer.cancel();
-
+
+ @Override
+ protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+
+ private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logSuccessResponse(event, response);
+ } else {
+ logErrorResponse(event, response);
+ throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ return response;
}
-
+ private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ }
+
+ private void logErrorResponse(GraphEvent event, GraphEvent response) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ }
}
\ No newline at end of file