Support for bulk API in async mode
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
index 360a7dc..840576e 100644 (file)
@@ -25,6 +25,7 @@ package org.onap.crud.service;
 
 import com.att.ecomp.event.api.EventConsumer;
 import com.att.ecomp.event.api.EventPublisher;
+import com.google.gson.JsonElement;
 
 import org.onap.aai.cl.api.LogFields;
 import org.onap.aai.cl.api.Logger;
@@ -48,6 +49,10 @@ import org.onap.schema.OxmModelValidator;
 import org.onap.schema.RelationshipSchemaValidator;
 
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -206,20 +211,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version,
                                                                                                      response.getVertex().toVertex()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -234,20 +231,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -262,20 +251,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -294,20 +275,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -321,18 +294,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -346,18 +311,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -374,20 +331,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
                                                                                                              response.getEdge().toEdge()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -404,20 +353,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -432,8 +373,168 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
   @Override
   public String addBulk(String version, BulkPayload payload) throws CrudException {
-    throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);
-  }
+    HashMap<String, Vertex> vertices = new HashMap<String, Vertex>();
+    HashMap<String, Edge> edges = new HashMap<String, Edge>();
+    String txId = dao.openTransaction();   
+     
+    try {
+      // Handle vertices
+      for (JsonElement v : payload.getObjects()) {
+        List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
+            v.getAsJsonObject().entrySet());
 
+        if (entries.size() != 2) {
+          throw new CrudException("", Status.BAD_REQUEST);
+        }
+        Map.Entry<String, JsonElement> opr = entries.get(0);
+        Map.Entry<String, JsonElement> item = entries.get(1);
+
+        VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
+
+        if (opr.getValue().getAsString().equalsIgnoreCase("add")
+            || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
+          Vertex validatedVertex;
+          GraphEvent event;
+          
+          if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
+            // Publish add-vertex event
+            validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
+                vertexPayload.getProperties());
+            event = GraphEvent.builder(GraphEventOperation.CREATE)
+                .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
+          } else {
+            // Publish update-vertex event
+            validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
+                vertexPayload.getType(), vertexPayload.getProperties());
+            event = GraphEvent.builder(GraphEventOperation.UPDATE)
+                .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();           
+          }
+          
+          event.setDbTransactionId(txId);
+          GraphEvent response = publishEvent(event);
+          Vertex persistedVertex = response.getVertex().toVertex();
+          Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
+          vertices.put(item.getKey(), outgoingVertex);
+        } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
+          // Publish delete-vertex event
+          String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
+          GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+            .vertex(new GraphEventVertex(vertexPayload.getId(), version, type, null)).build();
+          event.setDbTransactionId(txId);
+          publishEvent(event); 
+        }
+      }
+      
+      // Handle Edges
+      for (JsonElement v : payload.getRelationships()) {
+        List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
+            v.getAsJsonObject().entrySet());
+
+        if (entries.size() != 2) {
+          throw new CrudException("", Status.BAD_REQUEST);
+        }
+        Map.Entry<String, JsonElement> opr = entries.get(0);
+        Map.Entry<String, JsonElement> item = entries.get(1);
+
+        EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
+
+        if (opr.getValue().getAsString().equalsIgnoreCase("add")
+            || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
+          Edge validatedEdge;
+          Edge persistedEdge;
+          if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
+            // Fix the source/destination
+            if (edgePayload.getSource().startsWith("$")) {
+              Vertex source = vertices.get(edgePayload.getSource().substring(1));
+              if (source == null) {
+                throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
+                    Status.INTERNAL_SERVER_ERROR);
+              }
+              edgePayload
+                  .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
+            }
+            if (edgePayload.getTarget().startsWith("$")) {
+              Vertex target = vertices.get(edgePayload.getTarget().substring(1));
+              if (target == null) {
+                throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
+                    Status.INTERNAL_SERVER_ERROR);
+              }
+              edgePayload
+                  .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
+            }
+            validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),
+                edgePayload);
+            GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+                .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+            event.setDbTransactionId(txId);
+            GraphEvent response = publishEvent(event);
+            persistedEdge =  response.getEdge().toEdge();
+          } else {
+            Edge edge = dao.getEdge(edgePayload.getId(), edgePayload.getType(), txId);
+            validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
+            GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+                .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+            event.setDbTransactionId(txId);
+            GraphEvent response = publishEvent(event);
+            persistedEdge = response.getEdge().toEdge();
+          }
+
+          Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
+          edges.put(item.getKey(), outgoingEdge);
+        } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
+          RelationshipSchemaValidator.validateType(version, edgePayload.getType());
+          // Publish delete-vertex event
+          GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+            .edge(new GraphEventEdge(edgePayload.getId(), version, edgePayload.getType(), null, null, null)).build();
+          event.setDbTransactionId(txId);
+          publishEvent(event);
+        }
+      } 
+      
+      // commit transaction
+      dao.commitTransaction(txId);
+    } catch (CrudException ex) {
+      dao.rollbackTransaction(txId);
+      throw ex;
+    } catch (Exception ex) {
+      dao.rollbackTransaction(txId);
+      throw ex;
+    } finally {
+      if (dao.transactionExists(txId)) {
+        dao.rollbackTransaction(txId);
+      }
+    }
+    
+    return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
+  }
+  
+  private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+    GraphEvent response = sendAndWait(event);
+    if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+      logSuccessResponse(event, response);
+    } else {
+      logErrorResponse(event, response);
+      throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+                              + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+    } 
+    
+    return response;
+  }
 
+  private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult());
+  }
+  
+  private void logErrorResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult() + " , error: " + response.getErrorMessage());
+  }
+  
 }
\ No newline at end of file