X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fcrud%2Fservice%2FCrudAsyncGraphDataService.java;h=8d147ebed7aa1018464208396baba50225b605b8;hb=3bc6a702f2d3d8710c7aaa94cdc8c0ccf3deb759;hp=d825d79518938a2687d19e3fd0773314f2bf7936;hpb=eabf0a08faa6581c88c2e4ca5be56d4630891046;p=aai%2Fgizmo.git diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index d825d79..8d147eb 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -1,16 +1,15 @@ /** * ============LICENSE_START======================================================= - * Gizmo + * org.onap.aai * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. - * Copyright © 2017 Amdocs - * All rights reserved. + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,529 +17,424 @@ * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. */ package org.onap.crud.service; -import com.att.ecomp.event.api.EventConsumer; -import com.att.ecomp.event.api.EventPublisher; -import com.google.gson.JsonElement; - +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.PreDestroy; +import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.onap.aai.cl.api.LogFields; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.cl.mdc.MdcOverride; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; +import org.onap.aai.restclient.client.OperationResult; import org.onap.crud.dao.GraphDao; import org.onap.crud.entity.Edge; import org.onap.crud.entity.Vertex; import org.onap.crud.event.GraphEvent; import org.onap.crud.event.GraphEvent.GraphEventOperation; -import org.onap.crud.event.GraphEvent.GraphEventResult; import org.onap.crud.event.GraphEventEdge; import org.onap.crud.event.GraphEventVertex; +import org.onap.crud.event.envelope.GraphEventEnvelope; +import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; -import org.onap.crud.parser.CrudResponseBuilder; +import org.onap.crud.parser.EdgePayload; +import org.onap.crud.parser.VertexPayload; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; -import org.onap.crud.util.CrudServiceUtil; -import org.onap.schema.OxmModelValidator; -import org.onap.schema.RelationshipSchemaValidator; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.PreDestroy; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.Response.Status; +import org.onap.crud.util.etag.EtagGenerator; +import org.onap.schema.validation.OxmModelValidator; +import org.onap.schema.validation.RelationshipSchemaValidator; public class CrudAsyncGraphDataService extends AbstractGraphDataService { - private static Integer requestTimeOut; - - private EventPublisher asyncRequestPublisher; + private static Integer requestTimeOut; - private Timer timer; + private EventPublisher asyncRequestPublisher; - public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; - private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; + private Timer timer; - private static Logger logger = LoggerFactory.getInstance() - .getLogger(CrudAsyncGraphDataService.class.getName()); - private static Logger metricsLogger = LoggerFactory.getInstance() - .getMetricsLogger(CrudAsyncGraphDataService.class.getName()); - private static LogFields OK_FIELDS = new LogFields(); + public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; + private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; - static { - OK_FIELDS.setField(Status.OK, Status.OK.toString()); - } + private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName()); + private static Logger metricsLogger = + LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName()); + private static LogFields okFields = new LogFields(); + private EtagGenerator etagGenerator; - public static Integer getRequestTimeOut() { - return requestTimeOut; - } + static { + okFields.setField(Status.OK, Status.OK.toString()); + } - public CrudAsyncGraphDataService(GraphDao dao, - EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { + private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler(); - super(dao); - - requestTimeOut = DEFAULT_REQUEST_TIMEOUT; - try { - requestTimeOut - = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); - } catch (NumberFormatException ex) { - // Leave it as the default + public static Integer getRequestTimeOut() { + return requestTimeOut; } - Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; - try { - responsePollInterval = Integer - .parseInt(CrudProperties - .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); - } catch (Exception ex) { - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " - + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL - + " error: " + ex.getMessage()); + public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException { + this(dao, dao, asyncRequestPublisher, asyncResponseConsumer); } - // Start the Response Consumer timer - CrudAsyncResponseConsumer crudAsyncResponseConsumer - = new CrudAsyncResponseConsumer(asyncResponseConsumer); - timer = new Timer("crudAsyncResponseConsumer-1"); - timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException { - this.asyncRequestPublisher = asyncRequestPublisher; - - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); - } + super(); + this.dao = dao; + this.daoForGet = daoForGet; - public class CollectGraphResponse implements Callable { - private volatile GraphEvent graphEvent; - private volatile CountDownLatch latch = new CountDownLatch(1); + requestTimeOut = DEFAULT_REQUEST_TIMEOUT; + try { + requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); + } catch (NumberFormatException ex) { + // Leave it as the default + } - @Override - public GraphEvent call() throws TimeoutException { - try { - // Wait until graphEvent is available - latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - latch.countDown(); - if (this.graphEvent != null) { - return this.graphEvent; - } else { - throw new TimeoutException(); + Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; + try { + responsePollInterval = + Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); + } catch (Exception ex) { + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " + + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage()); } - } - return this.graphEvent; + + // Start the Response Consumer timer + CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer( + asyncResponseConsumer, new GraphEventUpdater() + ); + timer = new Timer("crudAsyncResponseConsumer-1"); + timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + + this.asyncRequestPublisher = asyncRequestPublisher; + this.etagGenerator = new EtagGenerator(); + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); } - public void populateGraphEvent(GraphEvent event) { - this.graphEvent = event; - latch.countDown(); + public class CollectGraphResponse implements Callable { + private volatile GraphEventEnvelope graphEventEnvelope; + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Override + public GraphEventEnvelope call() throws TimeoutException { + try { + // Wait until graphEvent is available + latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + latch.countDown(); + if (this.graphEventEnvelope != null) { + return this.graphEventEnvelope; + } else { + throw new TimeoutException(); + } + } + return this.graphEventEnvelope; + } + + public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) { + this.graphEventEnvelope = eventEnvelope; + latch.countDown(); + } } - } - private GraphEvent sendAndWait(GraphEvent event) throws CrudException { + private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException { - long startTimeInMs = System.currentTimeMillis(); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - MdcOverride override = new MdcOverride(); - override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + long startTimeInMs = System.currentTimeMillis(); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + MdcOverride override = new MdcOverride(); + override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); - // publish to request queue - try { - asyncRequestPublisher.sendSync(event.toJson()); - } catch (Exception e) { - throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR); - } - - logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson()); + String eventEnvelopeJson = new GraphEventEnvelope(event).toJson(); + + // publish to request queue + try { + asyncRequestPublisher.sendSync(eventEnvelopeJson); + } catch (Exception e) { + throw new CrudException( + "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), + Status.INTERNAL_SERVER_ERROR); + } + + logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson); - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); - - ExecutorService executor = Executors - .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); - CollectGraphResponse collector = new CollectGraphResponse(); - CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); - GraphEvent response; - Future future = executor.submit(collector); - try { - response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); - - } catch (InterruptedException | ExecutionException | TimeoutException e) { - CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, - "Request timed out for transactionId: " + event.getTransactionId()); - future.cancel(true); - throw new CrudException("Timed out , transactionId: " + event.getTransactionId() - + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); - } finally { - //Kill the thread as the work is completed - executor.shutdownNow(); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); + + ExecutorService executor = + Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); + CollectGraphResponse collector = new CollectGraphResponse(); + CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); + GraphEventEnvelope response; + Future future = executor.submit(collector); + try { + response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, + "Request timed out for transactionId: " + event.getTransactionId()); + future.cancel(true); + throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); + } finally { + // Kill the thread as the work is completed + executor.shutdownNow(); + } + metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override, + "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: " + + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs) + + " ms"); + return response; } - metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override, - "Total elapsed time for operation: " + event.getOperation().toString() - + " , transactionId: " + event.getTransactionId() + " is " - + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms"); - return response; - } - - public String addVertex(String version, String type, VertexPayload payload) throws CrudException { - // Validate the incoming payload - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, - type, payload.getProperties()); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, - response.getVertex().toVertex()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public ImmutablePair addVertex(String version, String type, VertexPayload payload) + throws CrudException { + // Validate the incoming payload + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties()); + vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type); + // Create graph request event + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String addEdge(String version, String type, EdgePayload payload) throws CrudException { - Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .edge(GraphEventEdge.fromEdge(edge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public ImmutablePair addEdge(String version, String type, EdgePayload payload) + throws CrudException { + Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); + + // Create graph request event + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String updateVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, - type, payload.getProperties()); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public ImmutablePair updateVertex(String version, String id, String type, VertexPayload payload) + throws CrudException { + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties()); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String patchVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex existingVertex - = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type)); - Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, - type, payload.getProperties(), - existingVertex); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public ImmutablePair patchVertex(String version, String id, String type, VertexPayload payload) + throws CrudException { + OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, + new HashMap()); + Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); + Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type, + payload.getProperties(), existingVertex); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleVertexResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String deleteVertex(String version, String id, String type) throws CrudException { - type = OxmModelValidator.resolveCollectionType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .vertex(new GraphEventVertex(id, version, type, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String deleteVertex(String version, String id, String type) throws CrudException { + type = OxmModelValidator.resolveCollectionType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); + } + + @Override + public String deleteEdge(String version, String id, String type) throws CrudException { + RelationshipSchemaValidator.validateType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); + } + + @Override + public ImmutablePair updateEdge(String version, String id, String type, EdgePayload payload) + throws CrudException { + OperationResult operationResult = dao.getEdge(id, type, new HashMap()); + Edge edge = Edge.fromJson(operationResult.getResult()); + Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload); + + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String deleteEdge(String version, String id, String type) throws CrudException { - RelationshipSchemaValidator.validateType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public ImmutablePair patchEdge(String version, String id, String type, EdgePayload payload) + throws CrudException { + OperationResult operationResult = dao.getEdge(id, type, new HashMap()); + Edge edge = Edge.fromJson(operationResult.getResult()); + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + + EntityTag entityTag; + try { + entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge())); + } catch (IOException e) { + throw new CrudException(e); + } + String responsePayload = responseHandler.handleEdgeResponse(version, event, response); + + return new ImmutablePair<>(entityTag, responsePayload); } - } - - public String updateEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type); - Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, - response.getEdge().toEdge()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @PreDestroy + protected void preShutdown() { + timer.cancel(); } - } - - public String patchEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type); - Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); } - } + @Override + protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); + } - @PreDestroy - protected void preShutdown() { - timer.cancel(); + @Override + protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } - } + @Override + protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } - @Override - public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { - HashMap vertices = new HashMap(); - HashMap edges = new HashMap(); - String txId = dao.openTransaction(); - - try { - // Handle vertices - for (JsonElement v : payload.getObjects()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); + @Override + protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); - } - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - - VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); - - if (opr.getValue().getAsString().equalsIgnoreCase("add") - || opr.getValue().getAsString().equalsIgnoreCase("modify")) { - Vertex validatedVertex; - GraphEvent event; - - if (opr.getValue().getAsString().equalsIgnoreCase("add")) { - vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), - headers, true)); - // Publish add-vertex event - validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(), - vertexPayload.getProperties()); - event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build(); - } else { - vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), - headers, false)); - // Publish update-vertex event - validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version, - vertexPayload.getType(), vertexPayload.getProperties()); - event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build(); - } - - event.setDbTransactionId(txId); - GraphEvent response = publishEvent(event); - Vertex persistedVertex = response.getVertex().toVertex(); - Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); - vertices.put(item.getKey(), outgoingVertex); - } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { - // Publish delete-vertex event - String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .vertex(new GraphEventVertex(vertexPayload.getId(), version, type, null)).build(); - event.setDbTransactionId(txId); - publishEvent(event); - } - } - - // Handle Edges - for (JsonElement v : payload.getRelationships()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); - - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); + @Override + protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { + // Get the edge type + String type = null; + try { + Edge edge = daoForGet.getEdge(id); + type = edge.getType(); } - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - - EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); - - if (opr.getValue().getAsString().equalsIgnoreCase("add") - || opr.getValue().getAsString().equalsIgnoreCase("modify")) { - Edge validatedEdge; - Edge persistedEdge; - if (opr.getValue().getAsString().equalsIgnoreCase("add")) { - // Fix the source/destination - if (edgePayload.getSource().startsWith("$")) { - Vertex source = vertices.get(edgePayload.getSource().substring(1)); - if (source == null) { - throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1), - Status.INTERNAL_SERVER_ERROR); - } - edgePayload - .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get()); - } - if (edgePayload.getTarget().startsWith("$")) { - Vertex target = vertices.get(edgePayload.getTarget().substring(1)); - if (target == null) { - throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1), - Status.INTERNAL_SERVER_ERROR); - } - edgePayload - .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get()); - } - validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(), - edgePayload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); - event.setDbTransactionId(txId); - GraphEvent response = publishEvent(event); - persistedEdge = response.getEdge().toEdge(); - } else { - Edge edge = dao.getEdge(edgePayload.getId(), edgePayload.getType(), txId); - validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); - event.setDbTransactionId(txId); - GraphEvent response = publishEvent(event); - persistedEdge = response.getEdge().toEdge(); - } - - Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge); - edges.put(item.getKey(), outgoingEdge); - } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { - RelationshipSchemaValidator.validateType(version, edgePayload.getType()); - // Publish delete-vertex event - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(edgePayload.getId(), version, edgePayload.getType(), null, null, null)).build(); - event.setDbTransactionId(txId); - publishEvent(event); + catch (CrudException ex) { + // Likely the client is trying to delete an edge which isn't present. Just swallow the exception + // and let the bulk request fail via the normal path. } - } - - // commit transaction - dao.commitTransaction(txId); - } catch (CrudException ex) { - dao.rollbackTransaction(txId); - throw ex; - } catch (Exception ex) { - dao.rollbackTransaction(txId); - throw ex; - } finally { - if (dao.transactionExists(txId)) { - dao.rollbackTransaction(txId); - } + + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } + + private GraphEvent publishEvent(GraphEvent event) throws CrudException { + GraphEventEnvelope response = sendAndWait(event); + responseHandler.handleBulkEventResponse(event, response); + return response.getBody(); } - - return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload); - } - - private GraphEvent publishEvent(GraphEvent event) throws CrudException { - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); - } - - return response; - } - - private void logSuccessResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult()); - } - - private void logErrorResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult() + " , error: " + response.getErrorMessage()); - } - } \ No newline at end of file