Add event client dependency
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
index 9efc7df..d674d0e 100644 (file)
@@ -23,8 +23,8 @@
  */
 package org.onap.crud.service;
 
-import com.att.ecomp.event.api.EventConsumer;
-import com.att.ecomp.event.api.EventPublisher;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
 
 import org.onap.aai.cl.api.LogFields;
 import org.onap.aai.cl.api.Logger;
@@ -44,13 +44,11 @@ import org.onap.crud.logging.CrudServiceMsgs;
 import org.onap.crud.parser.CrudResponseBuilder;
 import org.onap.crud.util.CrudProperties;
 import org.onap.crud.util.CrudServiceConstants;
-import org.onap.crud.util.CrudServiceUtil;
 import org.onap.schema.OxmModelValidator;
 import org.onap.schema.RelationshipSchemaValidator;
 
 import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
+import java.util.HashMap;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -63,12 +61,10 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.PreDestroy;
 import javax.ws.rs.core.Response.Status;
 
-public class CrudAsyncGraphDataService {
+public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
   private static Integer requestTimeOut;
 
-  private GraphDao dao;
-
   private EventPublisher asyncRequestPublisher;
 
   private Timer timer;
@@ -89,11 +85,22 @@ public class CrudAsyncGraphDataService {
   public static Integer getRequestTimeOut() {
     return requestTimeOut;
   }
+  
+  public CrudAsyncGraphDataService(GraphDao dao, 
+          EventPublisher asyncRequestPublisher,
+          EventConsumer asyncResponseConsumer) throws CrudException {
+      this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
+  }
 
   public CrudAsyncGraphDataService(GraphDao dao, 
+          GraphDao daoForGet, 
                  EventPublisher asyncRequestPublisher,
                  EventConsumer asyncResponseConsumer) throws CrudException {
 
+    super();
+    this.dao = dao;
+    this.daoForGet = daoForGet;
+     
     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
     try {
       requestTimeOut
@@ -113,8 +120,6 @@ public class CrudAsyncGraphDataService {
                    + " error: " + ex.getMessage());
     }
 
-    this.dao = dao;
-
     // Start the Response Consumer timer
     CrudAsyncResponseConsumer crudAsyncResponseConsumer
       = new CrudAsyncResponseConsumer(asyncResponseConsumer);
@@ -122,9 +127,7 @@ public class CrudAsyncGraphDataService {
     timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
 
     this.asyncRequestPublisher = asyncRequestPublisher;
-
-    // load the schemas
-    CrudServiceUtil.loadModels();
+    
     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
                 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
   }
@@ -155,7 +158,7 @@ public class CrudAsyncGraphDataService {
     }
   }
 
-  private GraphEvent sendAndWait(GraphEvent event) throws Exception {
+  private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
 
     long startTimeInMs = System.currentTimeMillis();
     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
@@ -163,7 +166,13 @@ public class CrudAsyncGraphDataService {
     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
 
     // publish to request queue
-    asyncRequestPublisher.sendSync(event.toJson());
+    try {
+      asyncRequestPublisher.sendSync(event.toJson());
+    } catch (Exception e) {
+      throw new CrudException("Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
+    }
+    
+    logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
 
     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
@@ -197,7 +206,7 @@ public class CrudAsyncGraphDataService {
     return response;
   }
 
-  public String addVertex(String version, String type, VertexPayload payload) throws Exception {
+  public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
     // Validate the incoming payload
     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
                                                                     type, payload.getProperties());
@@ -207,27 +216,19 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version,
                                                                                                      response.getVertex().toVertex()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
 
   }
 
-  public String addEdge(String version, String type, EdgePayload payload) throws Exception {
+  public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
     Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
     // Create graph request event
     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
@@ -235,27 +236,19 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
   }
 
   public String updateVertex(String version, String id, String type, VertexPayload payload)
-    throws Exception {
+    throws CrudException {
     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
                                                                     type, payload.getProperties());
     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
@@ -263,20 +256,12 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -284,9 +269,9 @@ public class CrudAsyncGraphDataService {
   }
 
   public String patchVertex(String version, String id, String type, VertexPayload payload)
-    throws Exception {
+    throws CrudException {
     Vertex existingVertex
-      = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
+      = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
                                                                           type, payload.getProperties(),
                                                                           existingVertex);
@@ -295,70 +280,46 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
 
   }
 
-  public String deleteVertex(String version, String id, String type) throws Exception {
+  public String deleteVertex(String version, String id, String type) throws CrudException {
     type = OxmModelValidator.resolveCollectionType(version, type);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
       .vertex(new GraphEventVertex(id, version, type, null)).build();
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
 
   }
 
-  public String deleteEdge(String version, String id, String type) throws Exception {
+  public String deleteEdge(String version, String id, String type) throws CrudException {
     RelationshipSchemaValidator.validateType(version, type);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
       .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -366,8 +327,8 @@ public class CrudAsyncGraphDataService {
   }
 
   public String updateEdge(String version, String id, String type, EdgePayload payload)
-    throws Exception {
-    Edge edge = dao.getEdge(id, type);
+    throws CrudException {
+    Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
                                                                                    payload);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
@@ -375,20 +336,12 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
                                                                                                              response.getEdge().toEdge()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -396,8 +349,8 @@ public class CrudAsyncGraphDataService {
   }
 
   public String patchEdge(String version, String id, String type, EdgePayload payload)
-    throws Exception {
-    Edge edge = dao.getEdge(id, type);
+    throws CrudException {
+    Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
                                                                                 payload);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
@@ -405,65 +358,99 @@ public class CrudAsyncGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
 
   }
 
-  public String getEdge(String version, String id, String type) throws CrudException {
-    RelationshipSchemaValidator.validateType(version, type);
-    Edge edge = dao.getEdge(id, type);
+  @PreDestroy
+  protected void preShutdown() {
+    timer.cancel();
 
-    return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
-                                                    .validateOutgoingPayload(version, edge),
-                                                    version);
   }
-
-  public String getEdges(String version, String type, Map<String, String> filter)
-    throws CrudException {
-    RelationshipSchemaValidator.validateType(version, type);
-    List<Edge> items = dao.getEdges(type,
-                                    RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
-    return CrudResponseBuilder.buildGetEdgesResponse(items, version);
+  
+  @Override
+  protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+        .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event); 
+    return response.getVertex().toVertex();
   }
-
-  public String getVertex(String version, String id, String type) throws CrudException {
-    type = OxmModelValidator.resolveCollectionType(version, type);
-    Vertex vertex = dao.getVertex(id, type);
-    List<Edge> edges = dao.getVertexEdges(id);
-    return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
-                                                      .validateOutgoingPayload(version, vertex), edges,
-                                                      version);
+  
+  @Override
+  protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+        .vertex(GraphEventVertex.fromVertex(vertex, version)).build();    
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getVertex().toVertex();
   }
-
-  public String getVertices(String version, String type, Map<String, String> filter)
-    throws CrudException {
-    type = OxmModelValidator.resolveCollectionType(version, type);
-    List<Vertex> items = dao.getVertices(type,
-                                         OxmModelValidator.resolveCollectionfilter(version, type, filter));
-    return CrudResponseBuilder.buildGetVerticesResponse(items, version);
+  
+  @Override
+  protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
+    event.setDbTransactionId(dbTransId);
+    publishEvent(event); 
   }
-
-  @PreDestroy
-  protected void preShutdown() {
-    timer.cancel();
-
+  
+  @Override
+  protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getEdge().toEdge();
+  }
+  
+  @Override
+  protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getEdge().toEdge();
+  }
+  
+  @Override
+  protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+        .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+    event.setDbTransactionId(dbTransId);
+    publishEvent(event);
+  }
+  
+  private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+    GraphEvent response = sendAndWait(event);
+    if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+      logSuccessResponse(event, response);
+    } else {
+      logErrorResponse(event, response);
+      throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+                              + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+    } 
+    
+    return response;
   }
 
-
+  private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult());
+  }
+  
+  private void logErrorResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult() + " , error: " + response.getErrorMessage());
+  }
 }
\ No newline at end of file