X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fcrud%2Fservice%2FCrudAsyncGraphDataService.java;h=7906de0356ed7b6dcaf5115cc4b7eb282471d3a2;hb=f60a17c6abb6deef1c24f917488745cbc6e6a566;hp=5d37acb3bb24c548985f235e773cab3d1fb3800e;hpb=18703cdc19842806969d30f19f8171469c79a0fe;p=aai%2Fgizmo.git diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index 5d37acb..7906de0 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -20,8 +20,13 @@ */ package org.onap.crud.service; +import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -32,7 +37,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.PreDestroy; +import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.onap.aai.cl.api.LogFields; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; @@ -40,6 +48,7 @@ import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.cl.mdc.MdcOverride; import org.onap.aai.event.api.EventConsumer; import org.onap.aai.event.api.EventPublisher; +import org.onap.aai.restclient.client.OperationResult; import org.onap.crud.dao.GraphDao; import org.onap.crud.entity.Edge; import org.onap.crud.entity.Vertex; @@ -51,11 +60,16 @@ import org.onap.crud.event.envelope.GraphEventEnvelope; import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; +import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; +import org.onap.crud.util.CrudServiceUtil; +import org.onap.crud.util.etag.EtagGenerator; import org.onap.schema.OxmModelValidator; import org.onap.schema.RelationshipSchemaValidator; +import com.google.gson.JsonElement; + public class CrudAsyncGraphDataService extends AbstractGraphDataService { private static Integer requestTimeOut; @@ -71,6 +85,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { private static Logger metricsLogger = LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName()); private static LogFields okFields = new LogFields(); + private EtagGenerator etagGenerator; static { okFields.setField(Status.OK, Status.OK.toString()); @@ -83,12 +98,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { } public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { + EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException { this(dao, dao, asyncRequestPublisher, asyncResponseConsumer); } public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { + EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException { super(); this.dao = dao; @@ -111,11 +126,14 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { } // Start the Response Consumer timer - CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer); + CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer( + asyncResponseConsumer, new GraphEventUpdater() + ); timer = new Timer("crudAsyncResponseConsumer-1"); timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); this.asyncRequestPublisher = asyncRequestPublisher; + this.etagGenerator = new EtagGenerator(); logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); } @@ -168,8 +186,8 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); ExecutorService executor = Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); @@ -199,50 +217,92 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { } @Override - public String addVertex(String version, String type, VertexPayload payload) throws CrudException { + public ImmutablePair addVertex(String version, String type, VertexPayload payload) + throws CrudException { // Validate the incoming payload Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties()); - vertex.getProperties().put(org.onap.schema.OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type); + vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type); // Create graph request event GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleVertexResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @Override - public String addEdge(String version, String type, EdgePayload payload) throws CrudException { + public ImmutablePair addEdge(String version, String type, EdgePayload payload) + throws CrudException { Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); + // Create graph request event GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleEdgeResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @Override - public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException { + public ImmutablePair updateVertex(String version, String id, String type, VertexPayload payload) + throws CrudException { Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties()); GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleVertexResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @Override - public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException { - Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, + public ImmutablePair patchVertex(String version, String id, String type, VertexPayload payload) + throws CrudException { + OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap()); + Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type, payload.getProperties(), existingVertex); GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleVertexResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @Override @@ -266,25 +326,48 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { } @Override - public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap()); + public ImmutablePair updateEdge(String version, String id, String type, EdgePayload payload) + throws CrudException { + OperationResult operationResult = dao.getEdge(id, type, new HashMap()); + Edge edge = Edge.fromJson(operationResult.getResult()); Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleEdgeResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @Override - public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap()); + public ImmutablePair patchEdge(String version, String id, String type, EdgePayload payload) + throws CrudException { + OperationResult operationResult = dao.getEdge(id, type, new HashMap()); + Edge edge = Edge.fromJson(operationResult.getResult()); Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload); GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); GraphEventEnvelope response = sendAndWait(event); - return responseHandler.handleEdgeResponse(version, event, response); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } @PreDestroy @@ -292,8 +375,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { timer.cancel(); } - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + private Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -301,8 +383,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + private Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -310,16 +391,14 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + private void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .vertex(new GraphEventVertex(id, version, type, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -327,8 +406,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -336,17 +414,218 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { + private void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { + // Get the edge type + String type = null; + try { + Edge edge = daoForGet.getEdge(id); + type = edge.getType(); + } + catch (CrudException ex) { + // Likely the client is trying to delete an edge which isn't present. Just swallow the exception + // and let the bulk request fail via the normal path. + } + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } + @Override + public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { + HashMap vertices = new HashMap<>(); + HashMap edges = new HashMap<>(); + + String txId = dao.openTransaction(); + + try { + // Step 1. Handle edge deletes (must happen before vertex deletes) + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + deleteBulkEdge(edgePayload.getId(), version, txId); + } + } + + // Step 2: Handle vertex deletes + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()); + deleteBulkVertex(vertexPayload.getId(), version, type, txId); + } + } + + // Step 3: Handle vertex add/modify (must happen before edge adds) + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add vertex + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, true)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(), + vertexPayload.getProperties()); + Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Update vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version, + vertexPayload.getType(), vertexPayload.getProperties()); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Patch vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) { + if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) { + throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST); + } + + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + + OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap()); + Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); + Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(), + version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + } + + // Step 4: Handle edge add/modify + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add/Update edge + if (opr.getValue().getAsString().equalsIgnoreCase("add") + || opr.getValue().getAsString().equalsIgnoreCase("modify") + || opr.getValue().getAsString().equalsIgnoreCase("patch")) { + Edge validatedEdge; + Edge persistedEdge; + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + // Fix the source/destination + if (edgePayload.getSource().startsWith("$")) { + Vertex source = vertices.get(edgePayload.getSource().substring(1)); + if (source == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get()); + } + if (edgePayload.getTarget().startsWith("$")) { + Vertex target = vertices.get(edgePayload.getTarget().substring(1)); + if (target == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get()); + } + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version)); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload); + persistedEdge = addBulkEdge(validatedEdge, version, txId); + } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + Edge edge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(edge.getType()); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload); + persistedEdge = updateBulkEdge(validatedEdge, version, txId); + } else { + if (edgePayload.getId() == null) { + throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST); + } + Edge existingEdge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(existingEdge.getType()); + } + + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload); + persistedEdge = updateBulkEdge(patchedEdge, version, txId); + } + + + Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge); + edges.put(item.getKey(), outgoingEdge); + } + } + + // commit transaction + dao.commitTransaction(txId); + } catch (CrudException ex) { + dao.rollbackTransaction(txId); + throw ex; + } catch (Exception ex) { + dao.rollbackTransaction(txId); + throw ex; + } finally { + if (dao.transactionExists(txId)) { + dao.rollbackTransaction(txId); + } + } + + return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload); + } + private GraphEvent publishEvent(GraphEvent event) throws CrudException { GraphEventEnvelope response = sendAndWait(event); responseHandler.handleBulkEventResponse(event, response); return response.getBody(); } -} +} \ No newline at end of file