2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.crud.service;
23 import java.io.IOException;
24 import java.security.NoSuchAlgorithmException;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.Timer;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import javax.annotation.PreDestroy;
40 import javax.ws.rs.core.EntityTag;
41 import javax.ws.rs.core.HttpHeaders;
42 import javax.ws.rs.core.Response.Status;
43 import org.apache.commons.lang3.tuple.ImmutablePair;
44 import org.onap.aai.cl.api.LogFields;
45 import org.onap.aai.cl.api.Logger;
46 import org.onap.aai.cl.eelf.LoggerFactory;
47 import org.onap.aai.cl.mdc.MdcContext;
48 import org.onap.aai.cl.mdc.MdcOverride;
49 import org.onap.aai.event.api.EventConsumer;
50 import org.onap.aai.event.api.EventPublisher;
51 import org.onap.aai.restclient.client.OperationResult;
52 import org.onap.crud.dao.GraphDao;
53 import org.onap.crud.entity.Edge;
54 import org.onap.crud.entity.Vertex;
55 import org.onap.crud.event.GraphEvent;
56 import org.onap.crud.event.GraphEvent.GraphEventOperation;
57 import org.onap.crud.event.GraphEventEdge;
58 import org.onap.crud.event.GraphEventVertex;
59 import org.onap.crud.event.envelope.GraphEventEnvelope;
60 import org.onap.crud.event.response.GraphEventResponseHandler;
61 import org.onap.crud.exception.CrudException;
62 import org.onap.crud.logging.CrudServiceMsgs;
63 import org.onap.crud.parser.CrudResponseBuilder;
64 import org.onap.crud.util.CrudProperties;
65 import org.onap.crud.util.CrudServiceConstants;
66 import org.onap.crud.util.CrudServiceUtil;
67 import org.onap.crud.util.etag.EtagGenerator;
68 import org.onap.schema.OxmModelValidator;
69 import org.onap.schema.RelationshipSchemaValidator;
71 import com.google.gson.JsonElement;
73 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
75 private static Integer requestTimeOut;
77 private EventPublisher asyncRequestPublisher;
81 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
82 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
84 private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
85 private static Logger metricsLogger =
86 LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
87 private static LogFields okFields = new LogFields();
88 private EtagGenerator etagGenerator;
91 okFields.setField(Status.OK, Status.OK.toString());
94 private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
96 public static Integer getRequestTimeOut() {
97 return requestTimeOut;
100 public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
101 EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
102 this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
105 public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
106 EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
110 this.daoForGet = daoForGet;
112 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
114 requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
115 } catch (NumberFormatException ex) {
116 // Leave it as the default
119 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
121 responsePollInterval =
122 Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
123 } catch (Exception ex) {
124 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
125 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
128 // Start the Response Consumer timer
129 CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
130 asyncResponseConsumer, new GraphEventUpdater()
132 timer = new Timer("crudAsyncResponseConsumer-1");
133 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
135 this.asyncRequestPublisher = asyncRequestPublisher;
136 this.etagGenerator = new EtagGenerator();
138 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
141 public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
142 private volatile GraphEventEnvelope graphEventEnvelope;
143 private volatile CountDownLatch latch = new CountDownLatch(1);
146 public GraphEventEnvelope call() throws TimeoutException {
148 // Wait until graphEvent is available
149 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
150 } catch (InterruptedException e) {
152 if (this.graphEventEnvelope != null) {
153 return this.graphEventEnvelope;
155 throw new TimeoutException();
158 return this.graphEventEnvelope;
161 public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
162 this.graphEventEnvelope = eventEnvelope;
167 private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
169 long startTimeInMs = System.currentTimeMillis();
170 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
171 MdcOverride override = new MdcOverride();
172 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
174 String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
176 // publish to request queue
178 asyncRequestPublisher.sendSync(eventEnvelopeJson);
179 } catch (Exception e) {
180 throw new CrudException(
181 "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
182 Status.INTERNAL_SERVER_ERROR);
185 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
187 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
188 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
189 + " , transaction-id: " + event.getTransactionId() + " , operation: "
190 + event.getOperation().toString());
192 ExecutorService executor =
193 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
194 CollectGraphResponse collector = new CollectGraphResponse();
195 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
196 GraphEventEnvelope response;
197 Future<GraphEventEnvelope> future = executor.submit(collector);
199 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
201 } catch (InterruptedException | ExecutionException | TimeoutException e) {
202 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
203 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
204 "Request timed out for transactionId: " + event.getTransactionId());
206 throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
207 + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
209 // Kill the thread as the work is completed
210 executor.shutdownNow();
212 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
213 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
214 + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
220 public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
221 throws CrudException {
222 // Validate the incoming payload
223 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
224 vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
225 // Create graph request event
226 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
227 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
229 GraphEventEnvelope response = sendAndWait(event);
233 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
234 } catch (IOException e) {
235 throw new CrudException(e);
237 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
239 return new ImmutablePair<>(entityTag, responsePayload);
243 public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
244 throws CrudException {
245 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
247 // Create graph request event
249 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
251 GraphEventEnvelope response = sendAndWait(event);
255 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
256 } catch (IOException e) {
257 throw new CrudException(e);
259 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
261 return new ImmutablePair<>(entityTag, responsePayload);
265 public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
266 throws CrudException {
267 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
268 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
269 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
271 GraphEventEnvelope response = sendAndWait(event);
275 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
276 } catch (IOException e) {
277 throw new CrudException(e);
279 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
281 return new ImmutablePair<>(entityTag, responsePayload);
285 public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
286 throws CrudException {
287 OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
288 new HashMap<String, String>());
289 Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
290 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
291 payload.getProperties(), existingVertex);
292 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
293 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
295 GraphEventEnvelope response = sendAndWait(event);
299 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
300 } catch (IOException e) {
301 throw new CrudException(e);
303 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
305 return new ImmutablePair<>(entityTag, responsePayload);
309 public String deleteVertex(String version, String id, String type) throws CrudException {
310 type = OxmModelValidator.resolveCollectionType(version, type);
311 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
312 .vertex(new GraphEventVertex(id, version, type, null)).build();
314 GraphEventEnvelope response = sendAndWait(event);
315 return responseHandler.handleDeletionResponse(event, response);
319 public String deleteEdge(String version, String id, String type) throws CrudException {
320 RelationshipSchemaValidator.validateType(version, type);
321 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
322 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
324 GraphEventEnvelope response = sendAndWait(event);
325 return responseHandler.handleDeletionResponse(event, response);
329 public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
330 throws CrudException {
331 OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
332 Edge edge = Edge.fromJson(operationResult.getResult());
333 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
335 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
336 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
338 GraphEventEnvelope response = sendAndWait(event);
342 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
343 } catch (IOException e) {
344 throw new CrudException(e);
346 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
348 return new ImmutablePair<>(entityTag, responsePayload);
352 public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
353 throws CrudException {
354 OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
355 Edge edge = Edge.fromJson(operationResult.getResult());
356 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
357 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
358 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
360 GraphEventEnvelope response = sendAndWait(event);
364 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
365 } catch (IOException e) {
366 throw new CrudException(e);
368 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
370 return new ImmutablePair<>(entityTag, responsePayload);
374 protected void preShutdown() {
378 private Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
379 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
380 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
381 event.setDbTransactionId(dbTransId);
382 GraphEvent response = publishEvent(event);
383 return response.getVertex().toVertex();
386 private Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
387 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
388 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
389 event.setDbTransactionId(dbTransId);
390 GraphEvent response = publishEvent(event);
391 return response.getVertex().toVertex();
394 private void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
395 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
396 .vertex(new GraphEventVertex(id, version, type, null)).build();
397 event.setDbTransactionId(dbTransId);
401 private Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
403 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
404 event.setDbTransactionId(dbTransId);
405 GraphEvent response = publishEvent(event);
406 return response.getEdge().toEdge();
409 private Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
411 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
412 event.setDbTransactionId(dbTransId);
413 GraphEvent response = publishEvent(event);
414 return response.getEdge().toEdge();
417 private void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
421 Edge edge = daoForGet.getEdge(id);
422 type = edge.getType();
424 catch (CrudException ex) {
425 // Likely the client is trying to delete an edge which isn't present. Just swallow the exception
426 // and let the bulk request fail via the normal path.
429 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
430 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
431 event.setDbTransactionId(dbTransId);
436 public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException {
437 HashMap<String, Vertex> vertices = new HashMap<>();
438 HashMap<String, Edge> edges = new HashMap<>();
440 String txId = dao.openTransaction();
443 // Step 1. Handle edge deletes (must happen before vertex deletes)
444 for (JsonElement v : payload.getRelationships()) {
445 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
446 v.getAsJsonObject().entrySet());
448 if (entries.size() != 2) {
449 throw new CrudException("", Status.BAD_REQUEST);
451 Map.Entry<String, JsonElement> opr = entries.get(0);
452 Map.Entry<String, JsonElement> item = entries.get(1);
453 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
455 if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
456 deleteBulkEdge(edgePayload.getId(), version, txId);
460 // Step 2: Handle vertex deletes
461 for (JsonElement v : payload.getObjects()) {
462 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
463 v.getAsJsonObject().entrySet());
465 if (entries.size() != 2) {
466 throw new CrudException("", Status.BAD_REQUEST);
469 Map.Entry<String, JsonElement> opr = entries.get(0);
470 Map.Entry<String, JsonElement> item = entries.get(1);
471 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
473 if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
474 String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
475 deleteBulkVertex(vertexPayload.getId(), version, type, txId);
479 // Step 3: Handle vertex add/modify (must happen before edge adds)
480 for (JsonElement v : payload.getObjects()) {
481 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
482 v.getAsJsonObject().entrySet());
484 if (entries.size() != 2) {
485 throw new CrudException("", Status.BAD_REQUEST);
487 Map.Entry<String, JsonElement> opr = entries.get(0);
488 Map.Entry<String, JsonElement> item = entries.get(1);
489 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
492 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
493 vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
495 Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
496 vertexPayload.getProperties());
497 Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId);
498 Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
499 vertices.put(item.getKey(), outgoingVertex);
503 else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) {
504 vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
506 Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
507 vertexPayload.getType(), vertexPayload.getProperties());
508 Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId);
509 Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
510 vertices.put(item.getKey(), outgoingVertex);
514 else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) {
515 if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) {
516 throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST);
519 vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
522 OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap<String, String>());
523 Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
524 Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(),
525 version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex);
526 Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId);
527 Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
528 vertices.put(item.getKey(), outgoingVertex);
532 // Step 4: Handle edge add/modify
533 for (JsonElement v : payload.getRelationships()) {
534 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
535 v.getAsJsonObject().entrySet());
537 if (entries.size() != 2) {
538 throw new CrudException("", Status.BAD_REQUEST);
540 Map.Entry<String, JsonElement> opr = entries.get(0);
541 Map.Entry<String, JsonElement> item = entries.get(1);
542 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
545 if (opr.getValue().getAsString().equalsIgnoreCase("add")
546 || opr.getValue().getAsString().equalsIgnoreCase("modify")
547 || opr.getValue().getAsString().equalsIgnoreCase("patch")) {
550 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
551 // Fix the source/destination
552 if (edgePayload.getSource().startsWith("$")) {
553 Vertex source = vertices.get(edgePayload.getSource().substring(1));
554 if (source == null) {
555 throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
556 Status.INTERNAL_SERVER_ERROR);
559 .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
561 if (edgePayload.getTarget().startsWith("$")) {
562 Vertex target = vertices.get(edgePayload.getTarget().substring(1));
563 if (target == null) {
564 throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
565 Status.INTERNAL_SERVER_ERROR);
568 .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
571 // If the type isn't set, resolve it based on on the sourece and target vertex types
572 if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
573 edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version));
576 validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload);
577 persistedEdge = addBulkEdge(validatedEdge, version, txId);
578 } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) {
579 Edge edge = dao.getEdge(edgePayload.getId(), txId);
581 // If the type isn't set, resolve it based on on the sourece and target vertex types
582 if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
583 edgePayload.setType(edge.getType());
586 validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
587 persistedEdge = updateBulkEdge(validatedEdge, version, txId);
589 if (edgePayload.getId() == null) {
590 throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST);
592 Edge existingEdge = dao.getEdge(edgePayload.getId(), txId);
594 // If the type isn't set, resolve it based on on the sourece and target vertex types
595 if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
596 edgePayload.setType(existingEdge.getType());
599 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload);
600 persistedEdge = updateBulkEdge(patchedEdge, version, txId);
604 Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
605 edges.put(item.getKey(), outgoingEdge);
609 // commit transaction
610 dao.commitTransaction(txId);
611 } catch (CrudException ex) {
612 dao.rollbackTransaction(txId);
614 } catch (Exception ex) {
615 dao.rollbackTransaction(txId);
618 if (dao.transactionExists(txId)) {
619 dao.rollbackTransaction(txId);
623 return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
626 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
627 GraphEventEnvelope response = sendAndWait(event);
628 responseHandler.handleBulkEventResponse(event, response);
629 return response.getBody();