X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fcrud%2Fservice%2FCrudAsyncGraphDataService.java;h=7906de0356ed7b6dcaf5115cc4b7eb282471d3a2;hb=f60a17c6abb6deef1c24f917488745cbc6e6a566;hp=aa70a661b9d54c540262a86adda75a3b7220a43e;hpb=194adee686ebb90488f739f2c637f6cb3def94d5;p=aai%2Fgizmo.git diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index aa70a66..7906de0 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -23,7 +23,10 @@ package org.onap.crud.service; import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -35,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.PreDestroy; import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.tuple.ImmutablePair; import org.onap.aai.cl.api.LogFields; @@ -56,13 +60,15 @@ import org.onap.crud.event.envelope.GraphEventEnvelope; import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; -import org.onap.crud.parser.EdgePayload; -import org.onap.crud.parser.VertexPayload; +import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; +import org.onap.crud.util.CrudServiceUtil; import org.onap.crud.util.etag.EtagGenerator; -import org.onap.schema.validation.OxmModelValidator; -import org.onap.schema.validation.RelationshipSchemaValidator; +import org.onap.schema.OxmModelValidator; +import org.onap.schema.RelationshipSchemaValidator; + +import com.google.gson.JsonElement; public class CrudAsyncGraphDataService extends AbstractGraphDataService { @@ -121,8 +127,8 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { // Start the Response Consumer timer CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer( - asyncResponseConsumer, new GraphEventUpdater() - ); + asyncResponseConsumer, new GraphEventUpdater() + ); timer = new Timer("crudAsyncResponseConsumer-1"); timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); @@ -180,8 +186,8 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); ExecutorService executor = Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); @@ -369,8 +375,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { timer.cancel(); } - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + private Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -378,8 +383,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + private Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -387,16 +391,14 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + private void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .vertex(new GraphEventVertex(id, version, type, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -404,8 +406,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -413,14 +414,215 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { + private void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { + // Get the edge type + String type = null; + try { + Edge edge = daoForGet.getEdge(id); + type = edge.getType(); + } + catch (CrudException ex) { + // Likely the client is trying to delete an edge which isn't present. Just swallow the exception + // and let the bulk request fail via the normal path. + } + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } + @Override + public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { + HashMap vertices = new HashMap<>(); + HashMap edges = new HashMap<>(); + + String txId = dao.openTransaction(); + + try { + // Step 1. Handle edge deletes (must happen before vertex deletes) + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + deleteBulkEdge(edgePayload.getId(), version, txId); + } + } + + // Step 2: Handle vertex deletes + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()); + deleteBulkVertex(vertexPayload.getId(), version, type, txId); + } + } + + // Step 3: Handle vertex add/modify (must happen before edge adds) + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add vertex + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, true)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(), + vertexPayload.getProperties()); + Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Update vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version, + vertexPayload.getType(), vertexPayload.getProperties()); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Patch vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) { + if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) { + throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST); + } + + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + + OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap()); + Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); + Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(), + version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + } + + // Step 4: Handle edge add/modify + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add/Update edge + if (opr.getValue().getAsString().equalsIgnoreCase("add") + || opr.getValue().getAsString().equalsIgnoreCase("modify") + || opr.getValue().getAsString().equalsIgnoreCase("patch")) { + Edge validatedEdge; + Edge persistedEdge; + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + // Fix the source/destination + if (edgePayload.getSource().startsWith("$")) { + Vertex source = vertices.get(edgePayload.getSource().substring(1)); + if (source == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get()); + } + if (edgePayload.getTarget().startsWith("$")) { + Vertex target = vertices.get(edgePayload.getTarget().substring(1)); + if (target == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get()); + } + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version)); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload); + persistedEdge = addBulkEdge(validatedEdge, version, txId); + } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + Edge edge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(edge.getType()); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload); + persistedEdge = updateBulkEdge(validatedEdge, version, txId); + } else { + if (edgePayload.getId() == null) { + throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST); + } + Edge existingEdge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(existingEdge.getType()); + } + + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload); + persistedEdge = updateBulkEdge(patchedEdge, version, txId); + } + + + Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge); + edges.put(item.getKey(), outgoingEdge); + } + } + + // commit transaction + dao.commitTransaction(txId); + } catch (CrudException ex) { + dao.rollbackTransaction(txId); + throw ex; + } catch (Exception ex) { + dao.rollbackTransaction(txId); + throw ex; + } finally { + if (dao.transactionExists(txId)) { + dao.rollbackTransaction(txId); + } + } + + return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload); + } + private GraphEvent publishEvent(GraphEvent event) throws CrudException { GraphEventEnvelope response = sendAndWait(event); responseHandler.handleBulkEventResponse(event, response);