/**
* ============LICENSE_START=======================================================
- * Gizmo
+ * org.onap.aai
* ================================================================================
- * Copyright © 2017 AT&T Intellectual Property.
- * Copyright © 2017 Amdocs
- * All rights reserved.
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
package org.onap.crud.service;
-import com.att.ecomp.event.api.EventConsumer;
-import com.att.ecomp.event.api.EventPublisher;
-
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.PreDestroy;
+import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.onap.aai.cl.api.LogFields;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.cl.mdc.MdcOverride;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+import org.onap.aai.restclient.client.OperationResult;
import org.onap.crud.dao.GraphDao;
import org.onap.crud.entity.Edge;
import org.onap.crud.entity.Vertex;
import org.onap.crud.event.GraphEvent;
import org.onap.crud.event.GraphEvent.GraphEventOperation;
-import org.onap.crud.event.GraphEvent.GraphEventResult;
import org.onap.crud.event.GraphEventEdge;
import org.onap.crud.event.GraphEventVertex;
+import org.onap.crud.event.envelope.GraphEventEnvelope;
+import org.onap.crud.event.response.GraphEventResponseHandler;
import org.onap.crud.exception.CrudException;
import org.onap.crud.logging.CrudServiceMsgs;
-import org.onap.crud.parser.CrudResponseBuilder;
import org.onap.crud.util.CrudProperties;
import org.onap.crud.util.CrudServiceConstants;
-import org.onap.crud.util.CrudServiceUtil;
+import org.onap.crud.util.etag.EtagGenerator;
import org.onap.schema.OxmModelValidator;
import org.onap.schema.RelationshipSchemaValidator;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.PreDestroy;
-import javax.ws.rs.core.Response.Status;
+public class CrudAsyncGraphDataService extends AbstractGraphDataService {
+
+ private static Integer requestTimeOut;
+
+ private EventPublisher asyncRequestPublisher;
+
+ private Timer timer;
+
+ public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
+ private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
+ private static Logger metricsLogger =
+ LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
+ private static LogFields okFields = new LogFields();
+ private EtagGenerator etagGenerator;
-public class CrudAsyncGraphDataService {
+ static {
+ okFields.setField(Status.OK, Status.OK.toString());
+ }
- private static Integer requestTimeOut;
+ private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
- private GraphDao dao;
+ public static Integer getRequestTimeOut() {
+ return requestTimeOut;
+ }
- private EventPublisher asyncRequestPublisher;
+ public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
+ this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
+ }
- private Timer timer;
+ public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
- public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
- private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
+ super();
+ this.dao = dao;
+ this.daoForGet = daoForGet;
- private static Logger logger = LoggerFactory.getInstance()
- .getLogger(CrudAsyncGraphDataService.class.getName());
- private static Logger metricsLogger = LoggerFactory.getInstance()
- .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
- private static LogFields OK_FIELDS = new LogFields();
+ requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
+ try {
+ requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
+ } catch (NumberFormatException ex) {
+ // Leave it as the default
+ }
- static {
- OK_FIELDS.setField(Status.OK, Status.OK.toString());
- }
+ Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
+ try {
+ responsePollInterval =
+ Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
+ } catch (Exception ex) {
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
+ + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
+ }
- public static Integer getRequestTimeOut() {
- return requestTimeOut;
- }
+ // Start the Response Consumer timer
+ CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
+ asyncResponseConsumer, new GraphEventUpdater()
+ );
+ timer = new Timer("crudAsyncResponseConsumer-1");
+ timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
- public CrudAsyncGraphDataService(GraphDao dao,
- EventPublisher asyncRequestPublisher,
- EventConsumer asyncResponseConsumer) throws CrudException {
+ this.asyncRequestPublisher = asyncRequestPublisher;
+ this.etagGenerator = new EtagGenerator();
- requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
- try {
- requestTimeOut
- = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
- } catch (NumberFormatException ex) {
- // Leave it as the default
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
}
- Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
- try {
- responsePollInterval = Integer
- .parseInt(CrudProperties
- .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
- } catch (Exception ex) {
- logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
- + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
- + " error: " + ex.getMessage());
+ public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
+ private volatile GraphEventEnvelope graphEventEnvelope;
+ private volatile CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public GraphEventEnvelope call() throws TimeoutException {
+ try {
+ // Wait until graphEvent is available
+ latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ latch.countDown();
+ if (this.graphEventEnvelope != null) {
+ return this.graphEventEnvelope;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ return this.graphEventEnvelope;
+ }
+
+ public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
+ this.graphEventEnvelope = eventEnvelope;
+ latch.countDown();
+ }
}
- this.dao = dao;
+ private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
+
+ long startTimeInMs = System.currentTimeMillis();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ MdcOverride override = new MdcOverride();
+ override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
+
+ String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
- // Start the Response Consumer timer
- CrudAsyncResponseConsumer crudAsyncResponseConsumer
- = new CrudAsyncResponseConsumer(asyncResponseConsumer);
- timer = new Timer("crudAsyncResponseConsumer-1");
- timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
+ // publish to request queue
+ try {
+ asyncRequestPublisher.sendSync(eventEnvelopeJson);
+ } catch (Exception e) {
+ throw new CrudException(
+ "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
+ Status.INTERNAL_SERVER_ERROR);
+ }
- this.asyncRequestPublisher = asyncRequestPublisher;
+ logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
- // load the schemas
- CrudServiceUtil.loadModels();
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
- }
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString());
+
+ ExecutorService executor =
+ Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
+ CollectGraphResponse collector = new CollectGraphResponse();
+ CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
+ GraphEventEnvelope response;
+ Future<GraphEventEnvelope> future = executor.submit(collector);
+ try {
+ response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
+ "Request timed out for transactionId: " + event.getTransactionId());
+ future.cancel(true);
+ throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ // Kill the thread as the work is completed
+ executor.shutdownNow();
+ }
+ metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
+ "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
+ + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
+ + " ms");
+ return response;
+ }
- public class CollectGraphResponse implements Callable<GraphEvent> {
- private volatile GraphEvent graphEvent;
- private volatile CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
+ throws CrudException {
+ // Validate the incoming payload
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
+ vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
+ // Create graph request event
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleVertexResponse(version, event, response);
+
+ return new ImmutablePair<>(entityTag, responsePayload);
+ }
@Override
- public GraphEvent call() throws TimeoutException {
- try {
- // Wait until graphEvent is available
- latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- latch.countDown();
- if (this.graphEvent != null) {
- return this.graphEvent;
- } else {
- throw new TimeoutException();
+ public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
+ throws CrudException {
+ Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
+
+ // Create graph request event
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
+ } catch (IOException e) {
+ throw new CrudException(e);
}
- }
- return this.graphEvent;
+ String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
+
+ return new ImmutablePair<>(entityTag, responsePayload);
}
- public void populateGraphEvent(GraphEvent event) {
- this.graphEvent = event;
- latch.countDown();
+ @Override
+ public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
+ throws CrudException {
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleVertexResponse(version, event, response);
+
+ return new ImmutablePair<>(entityTag, responsePayload);
}
- }
- private GraphEvent sendAndWait(GraphEvent event) throws Exception {
+ @Override
+ public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
+ throws CrudException {
+ OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
+ new HashMap<String, String>());
+ Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
+ Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
+ payload.getProperties(), existingVertex);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleVertexResponse(version, event, response);
- long startTimeInMs = System.currentTimeMillis();
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
- MdcOverride override = new MdcOverride();
- override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
+ return new ImmutablePair<>(entityTag, responsePayload);
+ }
- // publish to request queue
- asyncRequestPublisher.sendSync(event.toJson());
+ @Override
+ public String deleteVertex(String version, String id, String type) throws CrudException {
+ type = OxmModelValidator.resolveCollectionType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .vertex(new GraphEventVertex(id, version, type, null)).build();
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString());
-
- ExecutorService executor = Executors
- .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
- CollectGraphResponse collector = new CollectGraphResponse();
- CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
- GraphEvent response;
- Future<GraphEvent> future = executor.submit(collector);
- try {
- response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
-
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
- logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
- "Request timed out for transactionId: " + event.getTransactionId());
- future.cancel(true);
- throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
- + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
- } finally {
- //Kill the thread as the work is completed
- executor.shutdownNow();
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleDeletionResponse(event, response);
}
- metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
- "Total elapsed time for operation: " + event.getOperation().toString()
- + " , transactionId: " + event.getTransactionId() + " is "
- + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
- return response;
- }
-
- public String addVertex(String version, String type, VertexPayload payload) throws Exception {
- // Validate the incoming payload
- Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
- type, payload.getProperties());
- // Create graph request event
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version,
- response.getVertex().toVertex()), version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+
+ @Override
+ public String deleteEdge(String version, String id, String type) throws CrudException {
+ RelationshipSchemaValidator.validateType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleDeletionResponse(event, response);
}
- }
-
- public String addEdge(String version, String type, EdgePayload payload) throws Exception {
- Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
- // Create graph request event
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
- .edge(GraphEventEdge.fromEdge(edge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
- version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
+ throws CrudException {
+ OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
+ Edge edge = Edge.fromJson(operationResult.getResult());
+ Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
+
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
+
+ return new ImmutablePair<>(entityTag, responsePayload);
}
- }
-
- public String updateVertex(String version, String id, String type, VertexPayload payload)
- throws Exception {
- Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
- type, payload.getProperties());
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
- version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+
+ @Override
+ public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
+ throws CrudException {
+ OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
+ Edge edge = Edge.fromJson(operationResult.getResult());
+ Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+
+ EntityTag entityTag;
+ try {
+ entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
+ } catch (IOException e) {
+ throw new CrudException(e);
+ }
+ String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
+
+ return new ImmutablePair<>(entityTag, responsePayload);
}
- }
-
- public String patchVertex(String version, String id, String type, VertexPayload payload)
- throws Exception {
- Vertex existingVertex
- = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
- Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
- type, payload.getProperties(),
- existingVertex);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
- version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @PreDestroy
+ protected void preShutdown() {
+ timer.cancel();
}
- }
-
- public String deleteVertex(String version, String id, String type) throws Exception {
- type = OxmModelValidator.resolveCollectionType(version, type);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
- .vertex(new GraphEventVertex(id, version, type, null)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return "";
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
}
- }
-
- public String deleteEdge(String version, String id, String type) throws Exception {
- RelationshipSchemaValidator.validateType(version, type);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
- .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return "";
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
}
- }
-
- public String updateEdge(String version, String id, String type, EdgePayload payload)
- throws Exception {
- Edge edge = dao.getEdge(id, type);
- Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
- payload);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version,
- response.getEdge().toEdge()), version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .vertex(new GraphEventVertex(id, version, type, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
}
- }
-
- public String patchEdge(String version, String id, String type, EdgePayload payload)
- throws Exception {
- Edge edge = dao.getEdge(id, type);
- Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
- payload);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
- version);
- } else {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
}
- }
-
- public String getEdge(String version, String id, String type) throws CrudException {
- RelationshipSchemaValidator.validateType(version, type);
- Edge edge = dao.getEdge(id, type);
-
- return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
- .validateOutgoingPayload(version, edge),
- version);
- }
-
- public String getEdges(String version, String type, Map<String, String> filter)
- throws CrudException {
- RelationshipSchemaValidator.validateType(version, type);
- List<Edge> items = dao.getEdges(type,
- RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
- return CrudResponseBuilder.buildGetEdgesResponse(items, version);
- }
-
- public String getVertex(String version, String id, String type) throws CrudException {
- type = OxmModelValidator.resolveCollectionType(version, type);
- Vertex vertex = dao.getVertex(id, type);
- List<Edge> edges = dao.getVertexEdges(id);
- return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
- .validateOutgoingPayload(version, vertex), edges,
- version);
- }
-
- public String getVertices(String version, String type, Map<String, String> filter)
- throws CrudException {
- type = OxmModelValidator.resolveCollectionType(version, type);
- List<Vertex> items = dao.getVertices(type,
- OxmModelValidator.resolveCollectionfilter(version, type, filter));
- return CrudResponseBuilder.buildGetVerticesResponse(items, version);
- }
-
- @PreDestroy
- protected void preShutdown() {
- timer.cancel();
-
- }
+ @Override
+ protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+ @Override
+ protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
+ // Get the edge type
+ String type = null;
+ try {
+ Edge edge = daoForGet.getEdge(id);
+ type = edge.getType();
+ }
+ catch (CrudException ex) {
+ // Likely the client is trying to delete an edge which isn't present. Just swallow the exception
+ // and let the bulk request fail via the normal path.
+ }
+
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+ private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+ GraphEventEnvelope response = sendAndWait(event);
+ responseHandler.handleBulkEventResponse(event, response);
+ return response.getBody();
+ }
}
\ No newline at end of file