Add event client dependency
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
index 360a7dc..d674d0e 100644 (file)
@@ -23,8 +23,8 @@
  */
 package org.onap.crud.service;
 
-import com.att.ecomp.event.api.EventConsumer;
-import com.att.ecomp.event.api.EventPublisher;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
 
 import org.onap.aai.cl.api.LogFields;
 import org.onap.aai.cl.api.Logger;
@@ -48,6 +48,7 @@ import org.onap.schema.OxmModelValidator;
 import org.onap.schema.RelationshipSchemaValidator;
 
 import java.text.SimpleDateFormat;
+import java.util.HashMap;
 import java.util.Timer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -84,12 +85,21 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
   public static Integer getRequestTimeOut() {
     return requestTimeOut;
   }
+  
+  public CrudAsyncGraphDataService(GraphDao dao, 
+          EventPublisher asyncRequestPublisher,
+          EventConsumer asyncResponseConsumer) throws CrudException {
+      this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
+  }
 
   public CrudAsyncGraphDataService(GraphDao dao, 
+          GraphDao daoForGet, 
                  EventPublisher asyncRequestPublisher,
                  EventConsumer asyncResponseConsumer) throws CrudException {
 
-     super(dao);
+    super();
+    this.dao = dao;
+    this.daoForGet = daoForGet;
      
     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
     try {
@@ -206,20 +216,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version,
                                                                                                      response.getVertex().toVertex()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -234,20 +236,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -262,20 +256,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -285,7 +271,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
   public String patchVertex(String version, String id, String type, VertexPayload payload)
     throws CrudException {
     Vertex existingVertex
-      = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
+      = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
                                                                           type, payload.getProperties(),
                                                                           existingVertex);
@@ -294,20 +280,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertVertexResponse(
                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
                                                            version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -321,18 +299,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -346,18 +316,10 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return "";
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -366,7 +328,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
   public String updateEdge(String version, String id, String type, EdgePayload payload)
     throws CrudException {
-    Edge edge = dao.getEdge(id, type);
+    Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
                                                                                    payload);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
@@ -374,20 +336,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
                                                                                                              response.getEdge().toEdge()), version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -396,7 +350,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
   public String patchEdge(String version, String id, String type, EdgePayload payload)
     throws CrudException {
-    Edge edge = dao.getEdge(id, type);
+    Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
                                                                                 payload);
     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
@@ -404,20 +358,12 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
 
     GraphEvent response = sendAndWait(event);
     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult());
+      logSuccessResponse(event, response);
       return CrudResponseBuilder.buildUpsertEdgeResponse(
                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
                                                          version);
     } else {
-      logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
-                  "Event response received: " + response.getObjectType() + " with key: "
-                  + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
-                  + " , operation: " + event.getOperation().toString() + " , result: "
-                  + response.getResult() + " , error: " + response.getErrorMessage());
+      logErrorResponse(event, response);
       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
     }
@@ -429,11 +375,82 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService {
     timer.cancel();
 
   }
-
+  
   @Override
-  public String addBulk(String version, BulkPayload payload) throws CrudException {
-    throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);
+  protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+        .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event); 
+    return response.getVertex().toVertex();
+  }
+  
+  @Override
+  protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+        .vertex(GraphEventVertex.fromVertex(vertex, version)).build();    
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getVertex().toVertex();
+  }
+  
+  @Override
+  protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
+    event.setDbTransactionId(dbTransId);
+    publishEvent(event); 
+  }
+  
+  @Override
+  protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getEdge().toEdge();
+  }
+  
+  @Override
+  protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+    event.setDbTransactionId(dbTransId);
+    GraphEvent response = publishEvent(event);
+    return response.getEdge().toEdge();
+  }
+  
+  @Override
+  protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
+    GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+        .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+    event.setDbTransactionId(dbTransId);
+    publishEvent(event);
+  }
+  
+  private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+    GraphEvent response = sendAndWait(event);
+    if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+      logSuccessResponse(event, response);
+    } else {
+      logErrorResponse(event, response);
+      throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
+                              + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+    } 
+    
+    return response;
   }
 
-
+  private void logSuccessResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult());
+  }
+  
+  private void logErrorResponse(GraphEvent event, GraphEvent response) {
+    logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+        "Event response received: " + response.getObjectType() + " with key: "
+        + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+        + " , operation: " + event.getOperation().toString() + " , result: "
+        + response.getResult() + " , error: " + response.getErrorMessage());
+  }
 }
\ No newline at end of file