Support for bulk API in async mode 57/27457/1
authorsblimkie <steven.blimkie@amdocs.com>
Thu, 4 Jan 2018 16:35:51 +0000 (11:35 -0500)
committersblimkie <steven.blimkie@amdocs.com>
Thu, 4 Jan 2018 16:53:36 +0000 (11:53 -0500)
Allow bulk request processing in Gizmo's async mode

Change-Id: I2aa86aaee9534e7a84bd360e97513ac5077485f2
Issue-ID: AAI-482
Signed-off-by: sblimkie <steven.blimkie@amdocs.com>
ASYNC.md
src/main/java/org/onap/crud/event/GraphEvent.java
src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java

index 2b72675..7dfc35c 100644 (file)
--- a/ASYNC.md
+++ b/ASYNC.md
@@ -35,7 +35,8 @@ original request.
             "type": "pserver",
             "schema-version": "vX"
         },
-        "transaction-id": "c0a81fa7-5ef4-49cd-ab39-e42c53c9b9a4"
+        "transaction-id": "c0a81fa7-5ef4-49cd-ab39-e42c53c9b9a4",
+        "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
 
 #### Updating a Vertex
@@ -60,7 +61,8 @@ original request.
             "type": "pserver",
             "schema-version": "vX"
         },
-        "transaction-id": "3b8df1d5-4c51-47e3-bbef-c27b47e11149"
+        "transaction-id": "3b8df1d5-4c51-47e3-bbef-c27b47e11149",
+        "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
 
 #### Deleting a Vertex
@@ -73,7 +75,8 @@ original request.
             "type": "pserver",
             "schema-version": "vX"
         },
-        "transaction-id": "6bb7a27b-a942-4cac-9b2b-0fa1f3897b8c"
+        "transaction-id": "6bb7a27b-a942-4cac-9b2b-0fa1f3897b8c",
+        "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
 
 
@@ -101,7 +104,8 @@ original request.
           "type": "vserver"
         }
       },
-      "transaction-id": "63a8994d-1118-4e65-ab06-fff40f6f48ef"
+      "transaction-id": "63a8994d-1118-4e65-ab06-fff40f6f48ef",
+      "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
 
 #### Replace an Edge
@@ -128,7 +132,8 @@ original request.
           "type": "vserver"
         }
       },
-      "transaction-id": "ed284991-6c2f-4c94-a592-76fed17a2f14"
+      "transaction-id": "ed284991-6c2f-4c94-a592-76fed17a2f14",
+      "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
 
 
@@ -142,5 +147,6 @@ original request.
         "type": "tosca.relationships.HostedOn",
         "schema-version": "v11"
       },
-      "transaction-id": "b4583bc9-dd96-483f-ab2d-20c1c6e5622f"
+      "transaction-id": "b4583bc9-dd96-483f-ab2d-20c1c6e5622f",
+      "database-transaction-id": "b3e2853e-f643-47a3-a0c3-cb54cc997ad3"
     }
index b841389..3cfcd8e 100644 (file)
@@ -45,6 +45,9 @@ public class GraphEvent {
 
   @SerializedName("transaction-id")
   private String transactionId;
+  
+  @SerializedName("database-transaction-id")
+  private String dbTransactionId;
 
   private long timestamp;
 
@@ -76,6 +79,14 @@ public class GraphEvent {
   public String getTransactionId() {
     return transactionId;
   }
+  
+  public String getDbTransactionId() {
+    return dbTransactionId;
+  }
+  
+  public void setDbTransactionId(String id) {
+    dbTransactionId = id;
+  }
 
   public long getTimestamp() {
     return timestamp;
@@ -173,9 +184,8 @@ public class GraphEvent {
     } else if (this.getEdge() != null) {
       return this.getEdge().getId();
     }
-
+    
     return null;
-
   }
 
   public String getObjectType() {
@@ -186,7 +196,6 @@ public class GraphEvent {
     }
 
     return null;
-
   }
 
   public static class Builder {
index 360a7dc..840576e 100644 (file)
@@ -25,6 +25,7 @@ package org.onap.crud.service;
 
 import com.att.ecomp.event.api.EventConsumer;
 import com.att.ecomp.event.api.EventPublisher;
+import com.google.gson.JsonElement;
 
 import org.onap.aai.cl.api.LogFields;
 import org.onap.aai.cl.api.Logger;
@@ -48,6 +49,10 @@ import org.onap.schema.OxmModelValidator;
 import org.onap.schema.RelationshipSchemaValidator;
 
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -206,20 +211,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version,
                                                                                                      response.getVertex().toVertex()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -234,20 +231,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -262,20 +251,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -294,20 +275,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -321,18 +294,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -346,18 +311,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -374,20 +331,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
                                                                                                              response.getEdge().toEdge()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -404,20 +353,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -432,8 +373,168 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
   @Override
   public String addBulk(String version, BulkPayload payload) throws CrudException {
-    throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);
-  }
+    HashMap<String, Vertex> vertices = new HashMap<String, Vertex>();
+    HashMap<String, Edge> edges = new HashMap<String, Edge>();
+    String txId = dao.openTransaction();   
+     
+    try {
+      // Handle vertices
+      for (JsonElement v : payload.getObjects()) {
+        List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
+            v.getAsJsonObject().entrySet());
 
+        if (entries.size() != 2) {
+          throw new CrudException("", Status.BAD_REQUEST);
+        }
+        Map.Entry<String, JsonElement> opr = entries.get(0);
+        Map.Entry<String, JsonElement> item = entries.get(1);
+
+        VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
+
+        if (opr.getValue().getAsString().equalsIgnoreCase("add")
+            || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
+          Vertex validatedVertex;
+          GraphEvent event;
+          
+          if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
+            // Publish add-vertex event
+            validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
+                vertexPayload.getProperties());
+            event = GraphEvent.builder(GraphEventOperation.CREATE)
+                .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
+          } else {
+            // Publish update-vertex event
+            validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
+                vertexPayload.getType(), vertexPayload.getProperties());
+            event = GraphEvent.builder(GraphEventOperation.UPDATE)
+                .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();           
+          }
+          
+          event.setDbTransactionId(txId);
+          GraphEvent response = publishEvent(event);
+          Vertex persistedVertex = response.getVertex().toVertex();
+          Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
+          vertices.put(item.getKey(), outgoingVertex);
+        } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
+          // Publish delete-vertex event
+          String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
+          GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+            .vertex(new GraphEventVertex(vertexPayload.getId(), version, type, null)).build();
+          event.setDbTransactionId(txId);
+          publishEvent(event); 
+        }
+      }
+      
+      // Handle Edges
+      for (JsonElement v : payload.getRelationships()) {
+        List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
+            v.getAsJsonObject().entrySet());
+
+        if (entries.size() != 2) {
+          throw new CrudException("", Status.BAD_REQUEST);
+        }
+        Map.Entry<String, JsonElement> opr = entries.get(0);
+        Map.Entry<String, JsonElement> item = entries.get(1);
+
+        EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
+
+        if (opr.getValue().getAsString().equalsIgnoreCase("add")
+            || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
+          Edge validatedEdge;
+          Edge persistedEdge;
+          if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
+            // Fix the source/destination
+            if (edgePayload.getSource().startsWith("$")) {
+              Vertex source = vertices.get(edgePayload.getSource().substring(1));
+              if (source == null) {
+                throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
+                    Status.INTERNAL_SERVER_ERROR);
+              }
+              edgePayload
+                  .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
+            }
+            if (edgePayload.getTarget().startsWith("$")) {
+              Vertex target = vertices.get(edgePayload.getTarget().substring(1));
+              if (target == null) {
+                throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
+                    Status.INTERNAL_SERVER_ERROR);
+              }
+              edgePayload
+                  .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
+            }
+            validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),
+                edgePayload);
+            GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+                .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+            event.setDbTransactionId(txId);
+            GraphEvent response = publishEvent(event);
+            persistedEdge =  response.getEdge().toEdge();
+          } else {
+            Edge edge = dao.getEdge(edgePayload.getId(), edgePayload.getType(), txId);
+            validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
+            GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+                .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+            event.setDbTransactionId(txId);
+            GraphEvent response = publishEvent(event);
+            persistedEdge = response.getEdge().toEdge();
+          }
+
+          Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
+          edges.put(item.getKey(), outgoingEdge);
+        } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
+          RelationshipSchemaValidator.validateType(version, edgePayload.getType());
+          // Publish delete-vertex event
+          GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+            .edge(new GraphEventEdge(edgePayload.getId(), version, edgePayload.getType(), null, null, null)).build();
+          event.setDbTransactionId(txId);
+          publishEvent(event);
+        }
+      } 
+      
+      // commit transaction
+      dao.commitTransaction(txId);
+    } catch (CrudException ex) {
+      dao.rollbackTransaction(txId);
+      throw ex;
+    } catch (Exception ex) {
+      dao.rollbackTransaction(txId);
+      throw ex;
+    } finally {
+      if (dao.transactionExists(txId)) {
+        dao.rollbackTransaction(txId);
+      }
+    }
+    
+    return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
+  }
+  
+  private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+    GraphEvent response = sendAndWait(event);
+    if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+      logSuccessResponse(event, response);
+    } else {
+      logErrorResponse(event, response);
+      throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+                              + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+    } 
+    
+    return response;
+  }
 
+  private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult());
+  }
+  
+  private void logErrorResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult() + " , error: " + response.getErrorMessage());
+  }
+  
 }
\ No newline at end of file