Fix Gizmo Sonar bugs
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.crud.service;
22
23 import com.google.gson.JsonElement;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Timer;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import javax.annotation.PreDestroy;
41 import javax.ws.rs.core.EntityTag;
42 import javax.ws.rs.core.HttpHeaders;
43 import javax.ws.rs.core.Response.Status;
44 import org.apache.commons.lang3.tuple.ImmutablePair;
45 import org.onap.aai.cl.api.LogFields;
46 import org.onap.aai.cl.api.Logger;
47 import org.onap.aai.cl.eelf.LoggerFactory;
48 import org.onap.aai.cl.mdc.MdcContext;
49 import org.onap.aai.cl.mdc.MdcOverride;
50 import org.onap.aai.event.api.EventConsumer;
51 import org.onap.aai.event.api.EventPublisher;
52 import org.onap.aai.restclient.client.OperationResult;
53 import org.onap.crud.dao.GraphDao;
54 import org.onap.crud.entity.Edge;
55 import org.onap.crud.entity.Vertex;
56 import org.onap.crud.event.GraphEvent;
57 import org.onap.crud.event.GraphEvent.GraphEventOperation;
58 import org.onap.crud.event.GraphEventEdge;
59 import org.onap.crud.event.GraphEventVertex;
60 import org.onap.crud.event.envelope.GraphEventEnvelope;
61 import org.onap.crud.event.response.GraphEventResponseHandler;
62 import org.onap.crud.exception.CrudException;
63 import org.onap.crud.logging.CrudServiceMsgs;
64 import org.onap.crud.parser.CrudResponseBuilder;
65 import org.onap.crud.util.CrudProperties;
66 import org.onap.crud.util.CrudServiceConstants;
67 import org.onap.crud.util.CrudServiceUtil;
68 import org.onap.crud.util.etag.EtagGenerator;
69 import org.onap.schema.OxmModelValidator;
70 import org.onap.schema.RelationshipSchemaValidator;
71
72 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
73
74     private static Integer requestTimeOut;
75
76     private EventPublisher asyncRequestPublisher;
77
78     private Timer timer;
79
80     public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
81     private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
82
83     private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
84     private static Logger metricsLogger =
85             LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
86     private static LogFields okFields = new LogFields();
87     private EtagGenerator etagGenerator;
88
89     static {
90         okFields.setField(Status.OK, Status.OK.toString());
91     }
92
93     private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
94
95     public static Integer getRequestTimeOut() {
96         return requestTimeOut;
97     }
98
99     public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
100             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
101         this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
102     }
103
104     public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
105             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
106
107         super();
108         this.dao = dao;
109         this.daoForGet = daoForGet;
110
111         requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
112         try {
113             requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
114         } catch (NumberFormatException ex) {
115             // Leave it as the default
116         }
117
118         Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
119         try {
120             responsePollInterval =
121                     Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
122         } catch (Exception ex) {
123             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
124                     + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
125         }
126
127         // Start the Response Consumer timer
128         CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
129                 asyncResponseConsumer, new GraphEventUpdater()
130                 );
131         timer = new Timer("crudAsyncResponseConsumer-1");
132         timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
133
134         this.asyncRequestPublisher = asyncRequestPublisher;
135         this.etagGenerator = new EtagGenerator();
136
137         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
138     }
139
140     public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
141         private volatile GraphEventEnvelope graphEventEnvelope;
142         private volatile CountDownLatch latch = new CountDownLatch(1);
143
144         @Override
145         public GraphEventEnvelope call() throws TimeoutException {
146             try {
147                 // Wait until graphEvent is available
148                 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
149             } catch (InterruptedException e) {
150                 latch.countDown();
151                 // Restore interrupted state...
152                 Thread.currentThread().interrupt();
153                 if (this.graphEventEnvelope != null) {
154                     return this.graphEventEnvelope;
155                 } else {
156                     throw new TimeoutException();
157                 }
158             }
159             return this.graphEventEnvelope;
160         }
161
162         public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
163             this.graphEventEnvelope = eventEnvelope;
164             latch.countDown();
165         }
166     }
167
168     private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
169
170         long startTimeInMs = System.currentTimeMillis();
171         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
172         MdcOverride override = new MdcOverride();
173         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
174
175         String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
176
177         // publish to request queue
178         try {
179             asyncRequestPublisher.sendSync(eventEnvelopeJson);
180         } catch (Exception e) {
181             throw new CrudException(
182                     "Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(),
183                     Status.INTERNAL_SERVER_ERROR);
184         }
185
186         logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
187
188         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
189                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
190                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
191                 + event.getOperation().toString());
192
193         ExecutorService executor =
194                 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
195         CollectGraphResponse collector = new CollectGraphResponse();
196         CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
197         GraphEventEnvelope response;
198         Future<GraphEventEnvelope> future = executor.submit(collector);
199         try {
200             response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
201
202         } catch (InterruptedException | ExecutionException | TimeoutException e) {
203             CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
204             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
205                     "Request timed out for transactionId: " + event.getTransactionId());
206             future.cancel(true);
207             if (e instanceof InterruptedException) {
208                 // Restore interrupted state...
209                 Thread.currentThread().interrupt();
210             }
211             throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
212                     + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
213         } finally {
214             // Kill the thread as the work is completed
215             executor.shutdownNow();
216         }
217         metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
218                 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
219                         + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
220                         + " ms");
221         return response;
222     }
223
224     @Override
225     public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
226             throws CrudException {
227         // Validate the incoming payload
228         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
229         vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
230         // Create graph request event
231         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
232                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
233
234         GraphEventEnvelope response = sendAndWait(event);
235
236         EntityTag entityTag;
237         try {
238             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
239         } catch (IOException e) {
240             throw new CrudException(e);
241         }
242         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
243
244         return new ImmutablePair<>(entityTag, responsePayload);
245     }
246
247     @Override
248     public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
249             throws CrudException {
250         Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
251
252         // Create graph request event
253         GraphEvent event =
254                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
255
256         GraphEventEnvelope response = sendAndWait(event);
257
258         EntityTag entityTag;
259         try {
260             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
261         } catch (IOException e) {
262             throw new CrudException(e);
263         }
264         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
265
266         return new ImmutablePair<>(entityTag, responsePayload);
267     }
268
269     @Override
270     public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
271             throws CrudException {
272         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
273         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
274                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
275
276         GraphEventEnvelope response = sendAndWait(event);
277
278         EntityTag entityTag;
279         try {
280             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
281         } catch (IOException e) {
282             throw new CrudException(e);
283         }
284         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
285
286         return new ImmutablePair<>(entityTag, responsePayload);
287     }
288
289     @Override
290     public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
291             throws CrudException {
292         OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
293                 new HashMap<String, String>());
294         Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
295         Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
296                 payload.getProperties(), existingVertex);
297         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
298                 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
299
300         GraphEventEnvelope response = sendAndWait(event);
301
302         EntityTag entityTag;
303         try {
304             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
305         } catch (IOException e) {
306             throw new CrudException(e);
307         }
308         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
309
310         return new ImmutablePair<>(entityTag, responsePayload);
311     }
312
313     @Override
314     public String deleteVertex(String version, String id, String type) throws CrudException {
315         type = OxmModelValidator.resolveCollectionType(version, type);
316         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
317                 .vertex(new GraphEventVertex(id, version, type, null)).build();
318
319         GraphEventEnvelope response = sendAndWait(event);
320         return responseHandler.handleDeletionResponse(event, response);
321     }
322
323     @Override
324     public String deleteEdge(String version, String id, String type) throws CrudException {
325         RelationshipSchemaValidator.validateType(version, type);
326         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
327                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
328
329         GraphEventEnvelope response = sendAndWait(event);
330         return responseHandler.handleDeletionResponse(event, response);
331     }
332
333     @Override
334     public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
335             throws CrudException {
336         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
337         Edge edge = Edge.fromJson(operationResult.getResult());
338         Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
339
340         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
341                 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
342
343         GraphEventEnvelope response = sendAndWait(event);
344
345         EntityTag entityTag;
346         try {
347             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
348         } catch (IOException e) {
349             throw new CrudException(e);
350         }
351         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
352
353         return new ImmutablePair<>(entityTag, responsePayload);
354     }
355
356     @Override
357     public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
358             throws CrudException {
359         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
360         Edge edge = Edge.fromJson(operationResult.getResult());
361         Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
362         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
363                 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
364
365         GraphEventEnvelope response = sendAndWait(event);
366
367         EntityTag entityTag;
368         try {
369             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
370         } catch (IOException e) {
371             throw new CrudException(e);
372         }
373         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
374
375         return new ImmutablePair<>(entityTag, responsePayload);
376     }
377
378     @PreDestroy
379     protected void preShutdown() {
380         timer.cancel();
381     }
382
383     private Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
384         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
385                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
386         event.setDbTransactionId(dbTransId);
387         GraphEvent response = publishEvent(event);
388         return response.getVertex().toVertex();
389     }
390
391     private Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
392         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
393                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
394         event.setDbTransactionId(dbTransId);
395         GraphEvent response = publishEvent(event);
396         return response.getVertex().toVertex();
397     }
398
399     private void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
400         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
401                 .vertex(new GraphEventVertex(id, version, type, null)).build();
402         event.setDbTransactionId(dbTransId);
403         publishEvent(event);
404     }
405
406     private Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
407         GraphEvent event =
408                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
409         event.setDbTransactionId(dbTransId);
410         GraphEvent response = publishEvent(event);
411         return response.getEdge().toEdge();
412     }
413
414     private Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
415         GraphEvent event =
416                 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
417         event.setDbTransactionId(dbTransId);
418         GraphEvent response = publishEvent(event);
419         return response.getEdge().toEdge();
420     }
421
422     private void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
423         // Get the edge type
424         String type = null;
425         try {
426             Edge edge = daoForGet.getEdge(id);
427             type = edge.getType();
428         }
429         catch (CrudException ex) {
430             // Likely the client is trying to delete an edge which isn't present.  Just swallow the exception
431             // and let the bulk request fail via the normal path.
432         }
433
434         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
435                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
436         event.setDbTransactionId(dbTransId);
437         publishEvent(event);
438     }
439
440     @Override
441     public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException {
442         HashMap<String, Vertex> vertices = new HashMap<>();
443         HashMap<String, Edge> edges = new HashMap<>();
444
445         String txId = dao.openTransaction();
446
447         try {
448             // Step 1. Handle edge deletes (must happen before vertex deletes)
449             for (JsonElement v : payload.getRelationships()) {
450                 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
451                         v.getAsJsonObject().entrySet());
452
453                 if (entries.size() != 2) {
454                     throw new CrudException("", Status.BAD_REQUEST);
455                 }
456                 Map.Entry<String, JsonElement> opr = entries.get(0);
457                 Map.Entry<String, JsonElement> item = entries.get(1);
458                 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
459
460                 if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
461                     deleteBulkEdge(edgePayload.getId(), version, txId);
462                 }
463             }
464
465             // Step 2: Handle vertex deletes
466             for (JsonElement v : payload.getObjects()) {
467                 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
468                         v.getAsJsonObject().entrySet());
469
470                 if (entries.size() != 2) {
471                     throw new CrudException("", Status.BAD_REQUEST);
472                 }
473
474                 Map.Entry<String, JsonElement> opr = entries.get(0);
475                 Map.Entry<String, JsonElement> item = entries.get(1);
476                 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
477
478                 if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
479                     String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
480                     deleteBulkVertex(vertexPayload.getId(), version, type, txId);
481                 }
482             }
483
484             // Step 3: Handle vertex add/modify (must happen before edge adds)
485             for (JsonElement v : payload.getObjects()) {
486                 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
487                         v.getAsJsonObject().entrySet());
488
489                 if (entries.size() != 2) {
490                     throw new CrudException("", Status.BAD_REQUEST);
491                 }
492                 Map.Entry<String, JsonElement> opr = entries.get(0);
493                 Map.Entry<String, JsonElement> item = entries.get(1);
494                 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
495
496                 // Add vertex
497                 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
498                     vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
499                             headers, true));
500                     Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
501                             vertexPayload.getProperties());
502                     Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId);
503                     Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
504                     vertices.put(item.getKey(), outgoingVertex);
505                 }
506
507                 // Update vertex
508                 else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) {
509                     vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
510                             headers, false));
511                     Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
512                             vertexPayload.getType(), vertexPayload.getProperties());
513                     Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId);
514                     Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
515                     vertices.put(item.getKey(), outgoingVertex);
516                 }
517
518                 // Patch vertex
519                 else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) {
520                     if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) {
521                         throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST);
522                     }
523
524                     vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
525                             headers, false));
526
527                     OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap<String, String>());
528                     Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
529                     Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(),
530                             version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex);
531                     Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId);
532                     Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
533                     vertices.put(item.getKey(), outgoingVertex);
534                 }
535             }
536
537             // Step 4: Handle edge add/modify
538             for (JsonElement v : payload.getRelationships()) {
539                 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
540                         v.getAsJsonObject().entrySet());
541
542                 if (entries.size() != 2) {
543                     throw new CrudException("", Status.BAD_REQUEST);
544                 }
545                 Map.Entry<String, JsonElement> opr = entries.get(0);
546                 Map.Entry<String, JsonElement> item = entries.get(1);
547                 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
548
549                 // Add/Update edge
550                 if (opr.getValue().getAsString().equalsIgnoreCase("add")
551                         || opr.getValue().getAsString().equalsIgnoreCase("modify")
552                         || opr.getValue().getAsString().equalsIgnoreCase("patch")) {
553                     Edge validatedEdge;
554                     Edge persistedEdge;
555                     if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
556                         // Fix the source/destination
557                         if (edgePayload.getSource().startsWith("$")) {
558                             Vertex source = vertices.get(edgePayload.getSource().substring(1));
559                             if (source == null) {
560                                 throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
561                                         Status.INTERNAL_SERVER_ERROR);
562                             }
563                             edgePayload
564                             .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
565                         }
566                         if (edgePayload.getTarget().startsWith("$")) {
567                             Vertex target = vertices.get(edgePayload.getTarget().substring(1));
568                             if (target == null) {
569                                 throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
570                                         Status.INTERNAL_SERVER_ERROR);
571                             }
572                             edgePayload
573                             .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
574                         }
575
576                         // If the type isn't set, resolve it based on on the sourece and target vertex types
577                         if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
578                             edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version));
579                         }
580                         
581                         validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload);
582                         persistedEdge = addBulkEdge(validatedEdge, version, txId);
583                     } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) {
584                         Edge edge = dao.getEdge(edgePayload.getId(), txId);
585
586                         // If the type isn't set, resolve it based on on the sourece and target vertex types
587                         if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
588                             edgePayload.setType(edge.getType());
589                         }
590
591                         validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
592                         persistedEdge = updateBulkEdge(validatedEdge, version, txId);
593                     } else {
594                         if (edgePayload.getId() == null) {
595                             throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST);
596                         }
597                         Edge existingEdge = dao.getEdge(edgePayload.getId(), txId);
598
599                         // If the type isn't set, resolve it based on on the sourece and target vertex types
600                         if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) {
601                             edgePayload.setType(existingEdge.getType());
602                         }
603
604                         Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload);
605                         persistedEdge = updateBulkEdge(patchedEdge, version, txId);
606                     }
607
608
609                     Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
610                     edges.put(item.getKey(), outgoingEdge);
611                 }
612             }
613
614             // commit transaction
615             dao.commitTransaction(txId);
616         } catch (CrudException ex) {
617             dao.rollbackTransaction(txId);
618             throw ex;
619         } catch (Exception ex) {
620             dao.rollbackTransaction(txId);
621             throw ex;
622         } finally {
623             if (dao.transactionExists(txId)) {
624                 dao.rollbackTransaction(txId);
625             }
626         }
627
628         return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
629     }
630
631     private GraphEvent publishEvent(GraphEvent event) throws CrudException {
632         GraphEventEnvelope response = sendAndWait(event);
633         responseHandler.handleBulkEventResponse(event, response);
634         return response.getBody();
635     }
636 }