Update license date and text
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / graph / impl / AbstractTinkerpopChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.graph.impl;
22
23 import java.io.IOException;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.NoSuchElementException;
30 import java.util.Optional;
31 import java.util.Set;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.Vector;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.stream.Stream;
37 import java.util.stream.StreamSupport;
38
39 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
40 import org.apache.tinkerpop.gremlin.structure.Direction;
41 import org.apache.tinkerpop.gremlin.structure.Edge;
42 import org.apache.tinkerpop.gremlin.structure.Graph;
43 import org.apache.tinkerpop.gremlin.structure.Property;
44 import org.apache.tinkerpop.gremlin.structure.Vertex;
45 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
46 import org.onap.aai.champcore.ChampTransaction;
47 import org.onap.aai.champcore.NoOpTinkerPopTransaction;
48 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
49 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
50 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
51 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
52 import org.onap.aai.champcore.exceptions.ChampTransactionException;
53 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
54 import org.onap.aai.champcore.model.ChampObject;
55 import org.onap.aai.champcore.model.ChampPartition;
56 import org.onap.aai.champcore.model.ChampRelationship;
57 import org.onap.aai.champcore.model.ChampSchema;
58 import org.onap.aai.champcore.model.fluent.partition.CreateChampPartitionable;
59 import org.onap.aai.champcore.transform.TinkerpopChampformer;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.ObjectMapper;
65
66 public abstract class AbstractTinkerpopChampGraph extends AbstractValidatingChampGraph {
67         
68         private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTinkerpopChampGraph.class);
69         private static final TinkerpopChampformer TINKERPOP_CHAMPFORMER = new TinkerpopChampformer();
70         private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
71
72         private volatile AtomicBoolean isShutdown;
73
74         protected AbstractTinkerpopChampGraph(Map<String, Object> properties) {
75           super(properties);
76           
77           isShutdown = new AtomicBoolean(false);
78       Runtime.getRuntime().addShutdownHook(shutdownHook);
79         }
80         
81         private static final TinkerpopChampformer getChampformer() {
82                 return TINKERPOP_CHAMPFORMER;
83         }
84
85         private static final ObjectMapper getObjectMapper() {
86                 return OBJECT_MAPPER;
87         }
88
89         public abstract GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type);
90         
91         public ChampTransaction openTransaction() {
92           
93           return new TinkerpopTransaction(getGraph());
94         }         
95         
96         private Vertex writeVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
97                 final Vertex vertex;
98                 
99                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
100                 
101                 if (object.getKey().isPresent()) {
102                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
103
104                         if (vertexIter.hasNext()) {
105                                 vertex = vertexIter.next();
106                         } else throw new ChampObjectNotExistsException();
107                 } else {
108                         vertex = graphInstance.addVertex(object.getType());
109                 }
110
111                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
112
113                         if (property.getValue() instanceof List) {
114                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
115                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
116                                 }
117                         } else if (property.getValue() instanceof Set) {
118                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
119                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
120                                 }
121                         } else {
122                                 vertex.property(property.getKey(), property.getValue());
123                         }
124                 }
125
126                 return vertex;
127         }
128         
129         private Vertex replaceVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
130                 Vertex vertex;
131                 
132                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
133                 
134                 if (object.getKey().isPresent()) {
135                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
136
137                         if (vertexIter.hasNext()) {
138                                 vertex = vertexIter.next();
139                         } else throw new ChampObjectNotExistsException();
140                 } else {
141                         throw new ChampObjectNotExistsException();
142                 }
143
144                 //clear all the existing properties
145                 Iterator<VertexProperty<Object>> it = vertex.properties();
146                 while (it.hasNext()) {
147                         it.next().remove();
148                 }
149                 
150                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
151
152                         if (property.getValue() instanceof List) {
153                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
154                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
155                                 }
156                         } else if (property.getValue() instanceof Set) {
157                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
158                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
159                                 }
160                         } else {
161                                 vertex.property(property.getKey(), property.getValue());                                
162                         }
163                 }
164
165                 return vertex;
166         }
167
168         private Edge writeEdge(ChampRelationship relationship, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampMarshallingException {
169
170                 final Vertex source = writeVertex(relationship.getSource(), transaction);
171                 final Vertex target = writeVertex(relationship.getTarget(), transaction);
172                 final Edge edge;
173
174                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
175
176                 if (relationship.getKey().isPresent()) {
177                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
178
179                         if (edgeIter.hasNext()) {
180                                 edge = edgeIter.next();
181                         } else throw new ChampRelationshipNotExistsException();
182                 } else {
183                         edge = source.addEdge(relationship.getType(), target);
184                 }
185
186                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
187                         edge.property(property.getKey(), property.getValue());
188                 }
189
190                 return edge;
191         }
192         
193         private Edge replaceEdge(ChampRelationship relationship, ChampTransaction tx) throws  ChampRelationshipNotExistsException, ChampMarshallingException {
194                 final Edge edge;
195                 Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
196                 
197                 if(!relationship.getSource().getKey().isPresent() || !relationship.getTarget().getKey().isPresent()){
198                         throw new IllegalArgumentException("Invalid source/target");
199                 }
200                 
201                 if (relationship.getKey().isPresent()) {
202                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
203
204
205                         if (edgeIter.hasNext()) {
206                                 edge = edgeIter.next();
207                                 //validate if the source/target are the same as before. Throw error if not the same
208                                 if (!edge.outVertex().id().equals(relationship.getSource().getKey().get())
209                                                 || !edge.inVertex().id().equals(relationship.getTarget().getKey().get())) {
210                                         throw new IllegalArgumentException("source/target can't be updated");
211                                 }
212
213                         } else throw new ChampRelationshipNotExistsException();
214                 } else {
215                         throw new ChampRelationshipNotExistsException();
216                 }
217                 
218                 // clear all the existing properties
219                 Iterator<Property<Object>> it = edge.properties();
220                 while (it.hasNext()) {
221                         it.next().remove();
222                 }
223                                 
224                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
225                         edge.property(property.getKey(), property.getValue());
226                 }
227
228                 return edge;
229         }
230
231
232         protected abstract Graph getGraph();
233
234         
235
236         private Thread shutdownHook = new Thread() {
237                 @Override
238                 public void run() {
239                         try {
240                                 shutdown();
241                         } catch (IllegalStateException e) {
242                                 //Suppress, because shutdown() has already been called
243                         }
244                 }
245         };
246
247         protected boolean isShutdown() {
248                 return isShutdown.get();
249         }
250
251     @Override
252         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException {
253       return queryObjects(queryParams, Optional.empty());
254     }
255
256         
257         @Override
258         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
259           
260           if (isShutdown()) {
261             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
262           }
263         
264           // If we were not provided a transaction object then automatically open a transaction
265       // now.
266       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
267       
268       // Use the graph instance associated with our transaction.
269       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
270       
271       //If they provided the object key, do this the quick way rather than creating a traversal
272       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString())) {
273         
274         try {
275           final Optional<ChampObject> object = 
276               retrieveObject(queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString()),
277                              transaction);
278                         
279           if (object.isPresent()) {
280             return Stream.of(object.get());
281           } else {
282             return Stream.empty();
283           }
284         } catch (ChampUnmarshallingException e) {
285           LOGGER.warn("Failed to unmarshall object", e);
286           return Stream.empty();
287         }
288       }
289
290       final GraphTraversal<Vertex, Vertex> query = graphInstance.traversal().V();
291
292       for (Entry<String, Object> filter : queryParams.entrySet()) {      
293         if (filter.getKey().equals(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
294           continue; //For performance reasons, the label is the last thing to be added
295         } else {
296           query.has(filter.getKey(), filter.getValue());
297         }
298       }
299
300       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
301           hasLabel(query, queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString()));
302       }
303
304       final Iterator<ChampObject> objIter = new Iterator<ChampObject> () {
305         
306         private ChampObject next;
307
308
309           @Override
310           public boolean hasNext() {
311             while (query.hasNext()) {
312               try {
313                 next = getChampformer().unmarshallObject(query.next());
314                 return true;
315               } catch (ChampUnmarshallingException e) {
316                 LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
317               }                                 
318             }
319
320             // If we auto-created the transaction, then commit it now, otherwise it is up to the
321             // caller to decide when and if to do the commit.
322             if(!transaction.isPresent()) {
323               try {
324                 tx.commit(); //Danger ahead if this iterator is not completely consumed
325                              //then the transaction cache will hold stale values
326               } catch (ChampTransactionException e) {
327                 LOGGER.warn("Failed transaction commit due to: " + e.getMessage());
328               } 
329                            
330             }
331
332             next = null;
333             return false;
334           }
335
336           @Override
337           public ChampObject next() {
338             if (next == null) {
339               throw new NoSuchElementException();
340             }
341                                 
342                         return next;
343                   }
344       };
345
346       return StreamSupport.stream(Spliterators.spliteratorUnknownSize(objIter, 
347                                                                       Spliterator.ORDERED | Spliterator.NONNULL), 
348                                                                       false);
349     }
350
351     @Override
352     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException {
353       return retrieveObject(key, Optional.empty());
354     }
355     
356         @Override
357         public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException {
358           
359           if (isShutdown()) {
360             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
361           }
362           
363           // If we were not provided a transaction object then automatically open a transaction
364       // now.
365       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
366           
367           // Use the graph instance associated with our transaction.
368           Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
369           
370           final Iterator<Vertex> vertices = graphInstance.vertices(key);
371           final Optional<ChampObject> optionalObject;
372
373           if (!vertices.hasNext()) {
374             optionalObject = Optional.empty();
375             
376           } else {
377             optionalObject = Optional.of(getChampformer().unmarshallObject(vertices.next()));
378           }
379           
380           // If we auto-created the transaction, then commit it now, otherwise it is up to the
381           // caller to decide when and if to do the commit.
382           if(!transaction.isPresent()) {
383             tx.commit();
384           }
385           
386           return optionalObject;
387         }
388
389     @Override
390     public Stream<ChampRelationship> retrieveRelationships(ChampObject source) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
391       return retrieveRelationships(source, Optional.empty());
392     }
393     
394         @Override
395         public Stream<ChampRelationship> retrieveRelationships(ChampObject source, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
396             
397           if (isShutdown()) {
398             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
399           }
400                 
401           // If we were not provided a transaction object then automatically open a transaction
402       // now.
403       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
404       
405       // Use the graph instance associated with our transaction.
406       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
407       
408           final Vertex sourceVertex;
409
410           try {
411             sourceVertex = graphInstance.vertices(source.getKey().get()).next();
412           
413           } catch (NoSuchElementException e) {  
414             
415             // If we auto-created the transaction, then try to roll it back now, otherwise it is 
416             // up to the caller to decide when and if to do so.
417             if(!transaction.isPresent()) {
418                         tx.rollback();
419             }
420             
421                 throw new ChampObjectNotExistsException();
422       }
423
424           final Iterator<Edge> edges = sourceVertex.edges(Direction.BOTH);
425           final Iterator<ChampRelationship> relIter = new Iterator<ChampRelationship> () {
426
427             private ChampRelationship next;
428
429             @Override
430             public boolean hasNext() {
431               while (edges.hasNext()) {
432                 try {
433                   next = getChampformer().unmarshallRelationship(edges.next());
434                   return true;
435                 } catch (ChampUnmarshallingException e) {
436                   LOGGER.warn("Failed to unmarshall tinkerpop edge during query, returning partial results", e);
437                 }                                       
438               }
439
440               // If we auto-created the transaction, then commit it now, otherwise it is up to the
441               // caller to decide when and if to do the commit.
442               if(!transaction.isPresent()) {
443                 try {
444             tx.commit();   //Danger ahead if this iterator is not completely
445                            //consumed, then the transaction cache will be stale
446             
447           } catch (ChampTransactionException e) {
448             LOGGER.warn("Failed transaction commit due to: " + e.getMessage());
449           } 
450                              
451               }        
452               next = null;
453               return false;
454             }
455
456             @Override
457             public ChampRelationship next() {
458               if (next == null) {
459                 throw new NoSuchElementException();
460               }
461                                 
462               return next;
463             }
464           };
465
466           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
467                 relIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
468         }
469
470         
471         @Override
472     public ChampObject doStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
473                 
474           ChampTransaction tx = null;
475           
476           try {
477             
478             // If we were not provided a transaction object then automatically open a transaction
479             // now.
480             tx = getOrCreateTransactionInstance(transaction);
481             
482             // Now, store the object that we were supplied.
483             final Vertex vertex = writeVertex(object, tx);
484             
485             // Only auto-commit this operation if we were NOT provided a transaction context, 
486             // otherwise it is the caller's responsibility to commit the transaction when it
487             // is appropriate to do so.
488             if(!transaction.isPresent()) {
489               tx.commit();
490             }
491                
492             // Marshal the resulting vertex into a ChampObject and return it to the caller.
493             return ChampObject.create()
494                                    .from(object)
495                                    .withKey(vertex.id())
496                                    .build();
497
498           } catch (ChampObjectNotExistsException e) {
499         
500         // Something went wrong.  If we auto-created the transaction, then try to roll it back
501             // now. If we were supplied a transaction context then it is the caller's responsibility 
502             // to decide whether or not to roll it back.
503             if(!transaction.isPresent()) {
504               tx.rollback();
505             }
506                 
507             // Rethrow the exception.
508             throw e;
509           }
510         }
511         
512         @Override
513         public ChampObject doReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
514
515           ChampTransaction tx = null;
516           
517           try {
518                   
519             // If we were not provided a transaction object then automatically open a transaction
520             // now.
521             tx = getOrCreateTransactionInstance(transaction);
522                 
523             final Vertex vertex = replaceVertex(object, tx);
524
525         // Only auto-commit this operation if we were NOT provided a transaction context, 
526         // otherwise it is the caller's responsibility to commit the transaction when it
527         // is appropriate to do so.
528         if(!transaction.isPresent()) {
529           tx.commit();
530         }
531                         
532         // Marshal the resulting vertex into a ChampObject and return it to the caller.
533         return ChampObject.create()
534                                .from(object)
535                                .withKey(vertex.id())
536                                .build();
537                         
538                 } catch (ChampObjectNotExistsException e) {
539                   
540                   // Something went wrong.  If we auto-created the transaction, then try to roll it back
541               // now. If we were supplied a transaction context then it is the caller's responsibility 
542               // to decide whether or not to roll it back.
543               if(!transaction.isPresent()) {
544                 tx.rollback();
545               }
546
547               // Rethrow the exception.
548               throw e;
549                 }
550         }
551
552         @Override
553         public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
554           
555                 if (isShutdown()) {
556                   throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
557                 }
558
559             // If we were not provided a transaction object then automatically open a transaction
560         // now.
561         ChampTransaction tx = getOrCreateTransactionInstance(transaction);
562                 
563             // Use the graph instance associated with our transaction.
564             Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
565               
566                 final Iterator<Vertex> vertex = graphInstance.vertices(key);
567
568                 if (!vertex.hasNext()) {
569                   
570                   // If we auto-created the transaction, then roll it back now, otherwise it
571                   // is up to the caller to make that determination.
572                   if(!transaction.isPresent()) {
573                         tx.rollback();
574                   }
575                   
576                   throw new ChampObjectNotExistsException();
577                 }
578
579                 // Remove the vertex.
580                 vertex.next().remove();
581                 
582                 // If we auto-created the transaction, then commit it now, otherwise it
583                 // is up to the caller to decide if and when to commit.
584                 if(!transaction.isPresent()) {
585                   tx.commit();
586                 }
587         }
588
589     @Override
590     public ChampRelationship doStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
591         throws ChampUnmarshallingException, 
592                ChampObjectNotExistsException, 
593                ChampRelationshipNotExistsException, 
594                ChampMarshallingException, 
595                ChampTransactionException  {
596       
597       // If we were not provided a transaction object then automatically open a transaction
598       // now.
599       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
600       
601       try {
602         
603         // Store the edge in the graph.
604         final Edge edge = writeEdge(relationship, tx);
605         
606         // Unmarshal the stored edge into a ChampRelationship object
607         ChampRelationship storedRelationship = getChampformer().unmarshallRelationship(edge);
608         
609         // If we auto-created the transaction, then commit it now, otherwise it
610         // is up to the caller to decide if and when to commit.
611         if(!transaction.isPresent()) {
612           tx.commit();
613         }
614         
615         // Finally, return the result to the caller.
616         return storedRelationship;
617         
618       } catch (ChampObjectNotExistsException | 
619                ChampRelationshipNotExistsException | 
620                ChampUnmarshallingException | 
621                ChampMarshallingException e) {
622         
623         // If we auto-create the transaction, then try to roll it back, otherwise
624         // it is up to the caller to decide when and if to do so.
625         if(!transaction.isPresent()) {
626           tx.rollback();
627         }
628         throw e;
629       }   
630     }
631     
632         
633         @Override
634         public ChampRelationship doReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
635                         throws ChampUnmarshallingException, 
636                                ChampRelationshipNotExistsException, 
637                                ChampMarshallingException, 
638                                ChampTransactionException  {
639
640       // If we were not provided a transaction object then automatically open a transaction
641       // now.
642       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
643       
644       try {
645         final Edge edge = replaceEdge(relationship, tx);
646
647                   ChampRelationship unmarshalledRelationship = getChampformer().unmarshallRelationship(edge);
648
649                   // If we auto-created the transaction, then commit it now, otherwise it
650                   // is up to the caller to decide if and when to commit.
651                   if(!transaction.isPresent()) {
652                           tx.commit();
653                   }
654
655                   return unmarshalledRelationship;
656                         
657       } catch ( ChampRelationshipNotExistsException | ChampUnmarshallingException | ChampMarshallingException e) {
658         
659         // it is up to the caller to decide when and if to do so.
660         if(!transaction.isPresent()) {
661           tx.rollback();
662         }
663
664         throw e;
665       }
666         }  
667
668         @Override
669     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException {
670           return queryRelationships(queryParams, Optional.empty());
671         }
672            
673         @Override
674         public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
675           
676           if (isShutdown()) {
677             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
678           }
679
680       // If we were not provided a transaction object then automatically open a transaction
681       // now.
682       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
683       
684       // Use the graph instance associated with our transaction.
685       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
686       
687           // If they provided the relationship key, do this the quick way rather than creating a traversal
688           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString())) {
689             try {
690               final Optional<ChampRelationship> relationship = retrieveRelationship(queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString()),
691                                                                                     Optional.of(tx));
692                         
693               if (relationship.isPresent()) {
694                 return Stream.of(relationship.get());
695               
696               } else {
697                 return Stream.empty();
698               }
699             } catch (ChampUnmarshallingException e) {
700               
701               LOGGER.warn("Failed to unmarshall relationship", e);
702               return Stream.empty();
703             }
704           }
705          
706           final GraphTraversal<Edge, Edge> query = graphInstance.traversal().E();
707
708           for (Entry<String, Object> filter : queryParams.entrySet()) {
709             if (filter.getKey().equals(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
710               continue; //Add the label last for performance reasons
711             } else {
712               query.has(filter.getKey(), filter.getValue());
713             }
714           }
715
716           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
717                   hasLabel(query, queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString()));
718           }
719
720           final Iterator<ChampRelationship> objIter = new Iterator<ChampRelationship> () {
721         
722             private ChampRelationship next;
723
724             @Override
725             public boolean hasNext() {
726               while (query.hasNext()) {
727                 try {
728                   next = getChampformer().unmarshallRelationship(query.next());
729                   return true;
730                 } catch (ChampUnmarshallingException e) {
731                   LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
732                 }                                       
733               }
734
735               // If we auto-created the transaction, then commit it now, otherwise it
736               // is up to the caller to decide if and when to commit.
737               if(!transaction.isPresent()) {
738                 try { 
739             tx.commit();  //Danger ahead if this iterator is not completely
740                           //consumed, then the transaction cache will be stale
741             
742           } catch (ChampTransactionException e) {
743             LOGGER.warn("Failed transaction commit due to " + e.getMessage());
744           } 
745                              
746               }
747                                         
748               next = null;
749               return false;
750             }
751
752             @Override
753             public ChampRelationship next() {
754               if (next == null) {
755                 throw new NoSuchElementException();
756               }
757                                 
758               return next;
759             }
760           };
761
762           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
763                     objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
764         }
765
766     @Override
767     public Optional<ChampRelationship> retrieveRelationship(Object key)
768             throws ChampUnmarshallingException, ChampTransactionException {
769       return retrieveRelationship(key, Optional.empty());
770     }
771     
772     @Override
773     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction)
774                         throws ChampUnmarshallingException, ChampTransactionException {
775           
776       if (isShutdown()) {
777         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
778       }
779
780       // If we were not provided a transaction object then automatically open a transaction
781       // now.
782       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
783       
784       // Use the graph instance associated with our transaction.
785       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
786       
787       final Iterator<Edge> edge = graphInstance.edges(key);
788       final Optional<ChampRelationship> optionalRelationship;
789
790       if (!edge.hasNext()) {
791         optionalRelationship = Optional.empty();
792       } else {
793         optionalRelationship = Optional.of(getChampformer().unmarshallRelationship(edge.next()));
794       }
795
796       // If we auto-created the transaction, then commit it now, otherwise it
797       // is up to the caller to decide if and when to commit.
798       if(!transaction.isPresent()) {
799         tx.commit();
800       }
801
802       return optionalRelationship;
803         }
804
805     @Override
806         public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
807
808       if (isShutdown()) {
809         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
810       }
811       
812       if (!relationship.getKey().isPresent()) {
813         throw new IllegalArgumentException("Key must be provided when deleting a relationship");
814       }
815
816       // If we were not provided a transaction object then automatically open a transaction
817       // now.
818       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
819       
820       // Use the graph instance associated with our transaction.
821       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
822       
823       final Iterator<Edge> edge = graphInstance.edges(relationship.getKey().get());
824                 
825       if (!edge.hasNext()) {
826         
827         // If we auto-created the transaction, then try to roll it back now, otherwise it
828         // is up to the caller to decide if and when to do so.
829         if(!transaction.isPresent()) {
830           tx.rollback();
831         }
832         
833         throw new ChampRelationshipNotExistsException();
834       }
835       
836           edge.next().remove();
837
838           // If we auto-created the transaction, then commit it now, otherwise it
839           // is up to the caller to decide if and when to commit.
840           if(!transaction.isPresent()) {
841             tx.commit();
842           }
843     }
844
845         
846     @Override
847     public ChampPartition doStorePartition(ChampPartition submittedPartition, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampTransactionException {
848                 
849           if (isShutdown()) {
850             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
851           }
852
853       // If we were not provided a transaction object then automatically open a transaction
854       // now.
855       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
856             
857           try {
858             final HashMap<ChampObject, ChampObject> objectsWithKeys = new HashMap<ChampObject, ChampObject> ();
859             final CreateChampPartitionable storedPartition = ChampPartition.create();
860
861             for (ChampObject champObject : submittedPartition.getChampObjects()) {
862               final Vertex vertex = writeVertex(champObject, tx);
863               objectsWithKeys.put(champObject, ChampObject.create()
864                                                                                                                 .from(champObject)
865                                                                                                                 .withKey(vertex.id())
866                                                                                                                 .build());
867             }
868
869             for (ChampRelationship champRelationship : submittedPartition.getChampRelationships()) {
870               
871               if (!objectsWithKeys.containsKey(champRelationship.getSource())) {
872                 final Vertex vertex = writeVertex(champRelationship.getSource(), tx);
873
874                 objectsWithKeys.put(champRelationship.getSource(), ChampObject.create()
875                                                                                                                      .from(champRelationship.getSource())
876                                                                                                                      .withKey(vertex.id())
877                                                                                                                      .build());
878               }
879
880               if (!objectsWithKeys.containsKey(champRelationship.getTarget())) {
881                 final Vertex vertex = writeVertex(champRelationship.getTarget(), tx);
882
883                 objectsWithKeys.put(champRelationship.getTarget(), ChampObject.create()
884                                                                                                                         .from(champRelationship.getTarget())
885                                                                                                                         .withKey(vertex.id())
886                                                                                                                         .build());
887               }
888
889               final ChampRelationship.Builder relWithKeysBuilder = new ChampRelationship.Builder(objectsWithKeys.get(champRelationship.getSource()),
890                                                                                                                                                                                          objectsWithKeys.get(champRelationship.getTarget()),
891                                                                                                                                                                                          champRelationship.getType());
892
893               if (champRelationship.getKey().isPresent()) {
894                 relWithKeysBuilder.key(champRelationship.getKey().get());
895               }
896                                 
897               relWithKeysBuilder.properties(champRelationship.getProperties());
898
899               final Edge edge = writeEdge(relWithKeysBuilder.build(), tx);
900
901               storedPartition.withRelationship(ChampRelationship.create()
902                                                                                                                                 .from(champRelationship)
903                                                                                                                                 .withKey(edge.id())
904                                                                                                                                 .build());
905             }
906
907             for (ChampObject object : objectsWithKeys.values()) {
908               storedPartition.withObject(object);
909             }
910
911             // If we auto-created the transaction, then commit it now, otherwise it
912         // is up to the caller to decide if and when to commit.
913         if(!transaction.isPresent()) {
914           tx.commit();
915         }
916             
917         return storedPartition.build();
918                         
919       } catch (ChampObjectNotExistsException | ChampMarshallingException e) {
920         
921         // If we auto-created the transaction, then try to roll it back now, otherwise it
922         // is up to the caller to decide if and when to do so.
923         if(!transaction.isPresent()) {
924           tx.rollback();
925         }  
926
927                 throw e;
928       }
929         }
930
931     @Override
932         public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
933       
934       if (isShutdown()) {
935         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
936       }
937
938       // If we were not provided a transaction object then automatically open a transaction
939       // now.
940       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
941       
942       // Use the graph instance associated with our transaction.
943       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
944       
945       for (ChampObject champObject : graph.getChampObjects()) {
946         try {
947           final Object vertexId = champObject.getKey().get();
948           final Iterator<Vertex> vertex = graphInstance.vertices(vertexId);
949         
950           if (vertex.hasNext()) {
951             vertex.next().remove();
952           }
953                 } catch (NoSuchElementException e) {
954                   
955                   // If we auto-created the transaction, then try to roll it back now, otherwise it
956               // is up to the caller to decide if and when to do so.
957               if(!transaction.isPresent()) {
958                 tx.rollback();
959               } 
960
961               throw new IllegalArgumentException("Must pass a key to delete an object");
962             }
963       }
964
965       for (ChampRelationship champRelationship : graph.getChampRelationships()) {
966         try {
967           final Iterator<Edge> edge = graphInstance.edges(champRelationship.getKey().get());
968                 
969           if (edge.hasNext()) {
970             edge.next().remove();
971           }
972         } catch (NoSuchElementException e) {
973           
974           // If we auto-created the transaction, then try to roll it back now, otherwise it
975           // is up to the caller to decide if and when to do so.
976           if(!transaction.isPresent()) {
977             tx.rollback();
978           }
979
980           throw new IllegalArgumentException("Must pass a key to delete a relationship");
981         }
982       }
983
984       // If we auto-created the transaction, then commit it now, otherwise it
985       // is up to the caller to decide if and when to commit.
986       if(!transaction.isPresent()) {
987         tx.commit();
988       }
989         }
990
991         @Override
992         public void shutdown() {
993
994                 if (isShutdown.compareAndSet(false, true)) {
995                   super.shutdown();
996                         try {
997                                 getGraph().close();
998                         } catch (Throwable t) {
999                                 LOGGER.error("Exception while shutting down graph", t);
1000                         }
1001                 } else {
1002                         throw new IllegalStateException("Cannot call shutdown() after shutdown() was already initiated");
1003                 }
1004         }
1005
1006         @Override
1007         public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1008                 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
1009
1010                 if (getGraph().features().graph().variables().supportsVariables()) {
1011                         try {
1012                                 getGraph().variables().set("schema", getObjectMapper().writeValueAsBytes(schema));
1013                         } catch (JsonProcessingException e) {
1014                                 throw new RuntimeException(e);
1015                         }
1016                 } else {
1017                         super.storeSchema(schema);
1018                 }
1019         }
1020
1021         @Override
1022         public ChampSchema retrieveSchema() {
1023                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveSchema() after shutdown has been initiated");
1024
1025                 if (getGraph().features().graph().variables().supportsVariables()) {
1026                         final Optional<byte[]> schema = getGraph().variables().get("schema");
1027
1028                         if (schema.isPresent()) {
1029                                 try {
1030                                         return getObjectMapper().readValue(schema.get(), ChampSchema.class);
1031                                 } catch (IOException e) {
1032                                         throw new RuntimeException(e);
1033                                 }
1034                         }
1035                 }
1036
1037                 return super.retrieveSchema();
1038         }
1039
1040         @Override
1041         public void deleteSchema() {
1042                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteSchema() after shutdown has been initiated");
1043
1044                 if (getGraph().features().graph().variables().supportsVariables()) {
1045                         getGraph().variables().remove("schema");
1046                 } else {
1047                         super.deleteSchema();
1048                 }
1049         }
1050         
1051         public ChampTransaction getOrCreateTransactionInstance(Optional<ChampTransaction> transaction) {
1052           
1053           ChampTransaction tx = null;
1054           
1055       // If we were not provided a transaction object then automatically open a transaction
1056       // now.
1057       if(!transaction.isPresent()) {
1058         
1059           tx = new TinkerpopTransaction(getGraph());
1060           
1061       } else {
1062         tx = transaction.get();
1063       }
1064       
1065       return tx;
1066         }
1067 }