9a45e914cf8dd703151bf3d8336ecf0847650d14
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / graph / impl / AbstractTinkerpopChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champcore.graph.impl;
23
24 import java.io.IOException;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.NoSuchElementException;
31 import java.util.Optional;
32 import java.util.Set;
33 import java.util.Spliterator;
34 import java.util.Spliterators;
35 import java.util.Vector;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.stream.Stream;
38 import java.util.stream.StreamSupport;
39
40 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
41 import org.apache.tinkerpop.gremlin.structure.Direction;
42 import org.apache.tinkerpop.gremlin.structure.Edge;
43 import org.apache.tinkerpop.gremlin.structure.Graph;
44 import org.apache.tinkerpop.gremlin.structure.Property;
45 import org.apache.tinkerpop.gremlin.structure.Vertex;
46 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
47 import org.onap.aai.champcore.ChampTransaction;
48 import org.onap.aai.champcore.NoOpTinkerPopTransaction;
49 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
50 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
51 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
52 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
53 import org.onap.aai.champcore.exceptions.ChampTransactionException;
54 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
55 import org.onap.aai.champcore.model.ChampObject;
56 import org.onap.aai.champcore.model.ChampPartition;
57 import org.onap.aai.champcore.model.ChampRelationship;
58 import org.onap.aai.champcore.model.ChampSchema;
59 import org.onap.aai.champcore.model.fluent.partition.CreateChampPartitionable;
60 import org.onap.aai.champcore.transform.TinkerpopChampformer;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.ObjectMapper;
66
67 public abstract class AbstractTinkerpopChampGraph extends AbstractValidatingChampGraph {
68         
69         private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTinkerpopChampGraph.class);
70         private static final TinkerpopChampformer TINKERPOP_CHAMPFORMER = new TinkerpopChampformer();
71         private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
72
73         private volatile AtomicBoolean isShutdown;
74
75         protected AbstractTinkerpopChampGraph(Map<String, Object> properties) {
76           super(properties);
77           
78           isShutdown = new AtomicBoolean(false);
79       Runtime.getRuntime().addShutdownHook(shutdownHook);
80         }
81         
82         private static final TinkerpopChampformer getChampformer() {
83                 return TINKERPOP_CHAMPFORMER;
84         }
85
86         private static final ObjectMapper getObjectMapper() {
87                 return OBJECT_MAPPER;
88         }
89
90         public abstract GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type);
91         
92         public ChampTransaction openTransaction() {
93           
94           return new TinkerpopTransaction(getGraph());
95         }         
96         
97         private Vertex writeVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
98                 final Vertex vertex;
99                 
100                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
101                 
102                 if (object.getKey().isPresent()) {
103                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
104
105                         if (vertexIter.hasNext()) {
106                                 vertex = vertexIter.next();
107                         } else throw new ChampObjectNotExistsException();
108                 } else {
109                         vertex = graphInstance.addVertex(object.getType());
110                 }
111
112                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
113
114                         if (property.getValue() instanceof List) {
115                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
116                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
117                                 }
118                         } else if (property.getValue() instanceof Set) {
119                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
120                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
121                                 }
122                         } else {
123                                 vertex.property(property.getKey(), property.getValue());
124                         }
125                 }
126
127                 return vertex;
128         }
129         
130         private Vertex replaceVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
131                 Vertex vertex;
132                 
133                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
134                 
135                 if (object.getKey().isPresent()) {
136                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
137
138                         if (vertexIter.hasNext()) {
139                                 vertex = vertexIter.next();
140                         } else throw new ChampObjectNotExistsException();
141                 } else {
142                         throw new ChampObjectNotExistsException();
143                 }
144
145                 //clear all the existing properties
146                 Iterator<VertexProperty<Object>> it = vertex.properties();
147                 while (it.hasNext()) {
148                         it.next().remove();
149                 }
150                 
151                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
152
153                         if (property.getValue() instanceof List) {
154                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
155                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
156                                 }
157                         } else if (property.getValue() instanceof Set) {
158                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
159                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
160                                 }
161                         } else {
162                                 vertex.property(property.getKey(), property.getValue());                                
163                         }
164                 }
165
166                 return vertex;
167         }
168
169         private Edge writeEdge(ChampRelationship relationship, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampMarshallingException {
170
171                 final Vertex source = writeVertex(relationship.getSource(), transaction);
172                 final Vertex target = writeVertex(relationship.getTarget(), transaction);
173                 final Edge edge;
174
175                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
176
177                 if (relationship.getKey().isPresent()) {
178                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
179
180                         if (edgeIter.hasNext()) {
181                                 edge = edgeIter.next();
182                         } else throw new ChampRelationshipNotExistsException();
183                 } else {
184                         edge = source.addEdge(relationship.getType(), target);
185                 }
186
187                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
188                         edge.property(property.getKey(), property.getValue());
189                 }
190
191                 return edge;
192         }
193         
194         private Edge replaceEdge(ChampRelationship relationship, ChampTransaction tx) throws  ChampRelationshipNotExistsException, ChampMarshallingException {
195                 final Edge edge;
196                 Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
197                 
198                 if(!relationship.getSource().getKey().isPresent() || !relationship.getTarget().getKey().isPresent()){
199                         throw new IllegalArgumentException("Invalid source/target");
200                 }
201                 
202                 if (relationship.getKey().isPresent()) {
203                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
204
205
206                         if (edgeIter.hasNext()) {
207                                 edge = edgeIter.next();
208                                 //validate if the source/target are the same as before. Throw error if not the same
209                                 if (!edge.outVertex().id().equals(relationship.getSource().getKey().get())
210                                                 || !edge.inVertex().id().equals(relationship.getTarget().getKey().get())) {
211                                         throw new IllegalArgumentException("source/target can't be updated");
212                                 }
213
214                         } else throw new ChampRelationshipNotExistsException();
215                 } else {
216                         throw new ChampRelationshipNotExistsException();
217                 }
218                 
219                 // clear all the existing properties
220                 Iterator<Property<Object>> it = edge.properties();
221                 while (it.hasNext()) {
222                         it.next().remove();
223                 }
224                                 
225                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
226                         edge.property(property.getKey(), property.getValue());
227                 }
228
229                 return edge;
230         }
231
232
233         protected abstract Graph getGraph();
234
235         
236
237         private Thread shutdownHook = new Thread() {
238                 @Override
239                 public void run() {
240                         try {
241                                 shutdown();
242                         } catch (IllegalStateException e) {
243                                 //Suppress, because shutdown() has already been called
244                         }
245                 }
246         };
247
248         protected boolean isShutdown() {
249                 return isShutdown.get();
250         }
251
252     @Override
253         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException {
254       return queryObjects(queryParams, Optional.empty());
255     }
256
257         
258         @Override
259         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
260           
261           if (isShutdown()) {
262             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
263           }
264         
265           // If we were not provided a transaction object then automatically open a transaction
266       // now.
267       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
268       
269       // Use the graph instance associated with our transaction.
270       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
271       
272       //If they provided the object key, do this the quick way rather than creating a traversal
273       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString())) {
274         
275         try {
276           final Optional<ChampObject> object = 
277               retrieveObject(queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString()),
278                              transaction);
279                         
280           if (object.isPresent()) {
281             return Stream.of(object.get());
282           } else {
283             return Stream.empty();
284           }
285         } catch (ChampUnmarshallingException e) {
286           LOGGER.warn("Failed to unmarshall object", e);
287           return Stream.empty();
288         }
289       }
290
291       final GraphTraversal<Vertex, Vertex> query = graphInstance.traversal().V();
292
293       for (Entry<String, Object> filter : queryParams.entrySet()) {      
294         if (filter.getKey().equals(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
295           continue; //For performance reasons, the label is the last thing to be added
296         } else {
297           query.has(filter.getKey(), filter.getValue());
298         }
299       }
300
301       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
302           hasLabel(query, queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString()));
303       }
304
305       final Iterator<ChampObject> objIter = new Iterator<ChampObject> () {
306         
307         private ChampObject next;
308
309
310           @Override
311           public boolean hasNext() {
312             while (query.hasNext()) {
313               try {
314                 next = getChampformer().unmarshallObject(query.next());
315                 return true;
316               } catch (ChampUnmarshallingException e) {
317                 LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
318               }                                 
319             }
320
321             // If we auto-created the transaction, then commit it now, otherwise it is up to the
322             // caller to decide when and if to do the commit.
323             if(!transaction.isPresent()) {
324               try {
325                 tx.commit(); //Danger ahead if this iterator is not completely consumed
326                              //then the transaction cache will hold stale values
327               } catch (ChampTransactionException e) {
328                 LOGGER.warn("Failed transaction commit due to: " + e.getMessage());
329               } 
330                            
331             }
332
333             next = null;
334             return false;
335           }
336
337           @Override
338           public ChampObject next() {
339             if (next == null) {
340               throw new NoSuchElementException();
341             }
342                                 
343                         return next;
344                   }
345       };
346
347       return StreamSupport.stream(Spliterators.spliteratorUnknownSize(objIter, 
348                                                                       Spliterator.ORDERED | Spliterator.NONNULL), 
349                                                                       false);
350     }
351
352     @Override
353     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException {
354       return retrieveObject(key, Optional.empty());
355     }
356     
357         @Override
358         public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException {
359           
360           if (isShutdown()) {
361             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
362           }
363           
364           // If we were not provided a transaction object then automatically open a transaction
365       // now.
366       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
367           
368           // Use the graph instance associated with our transaction.
369           Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
370           
371           final Iterator<Vertex> vertices = graphInstance.vertices(key);
372           final Optional<ChampObject> optionalObject;
373
374           if (!vertices.hasNext()) {
375             optionalObject = Optional.empty();
376             
377           } else {
378             optionalObject = Optional.of(getChampformer().unmarshallObject(vertices.next()));
379           }
380           
381           // If we auto-created the transaction, then commit it now, otherwise it is up to the
382           // caller to decide when and if to do the commit.
383           if(!transaction.isPresent()) {
384             tx.commit();
385           }
386           
387           return optionalObject;
388         }
389
390     @Override
391     public Stream<ChampRelationship> retrieveRelationships(ChampObject source) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
392       return retrieveRelationships(source, Optional.empty());
393     }
394     
395         @Override
396         public Stream<ChampRelationship> retrieveRelationships(ChampObject source, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
397             
398           if (isShutdown()) {
399             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
400           }
401                 
402           // If we were not provided a transaction object then automatically open a transaction
403       // now.
404       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
405       
406       // Use the graph instance associated with our transaction.
407       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
408       
409           final Vertex sourceVertex;
410
411           try {
412             sourceVertex = graphInstance.vertices(source.getKey().get()).next();
413           
414           } catch (NoSuchElementException e) {  
415             
416             // If we auto-created the transaction, then try to roll it back now, otherwise it is 
417             // up to the caller to decide when and if to do so.
418             if(!transaction.isPresent()) {
419                         tx.rollback();
420             }
421             
422                 throw new ChampObjectNotExistsException();
423       }
424
425           final Iterator<Edge> edges = sourceVertex.edges(Direction.BOTH);
426           final Iterator<ChampRelationship> relIter = new Iterator<ChampRelationship> () {
427
428             private ChampRelationship next;
429
430             @Override
431             public boolean hasNext() {
432               while (edges.hasNext()) {
433                 try {
434                   next = getChampformer().unmarshallRelationship(edges.next());
435                   return true;
436                 } catch (ChampUnmarshallingException e) {
437                   LOGGER.warn("Failed to unmarshall tinkerpop edge during query, returning partial results", e);
438                 }                                       
439               }
440
441               // If we auto-created the transaction, then commit it now, otherwise it is up to the
442               // caller to decide when and if to do the commit.
443               if(!transaction.isPresent()) {
444                 try {
445             tx.commit();   //Danger ahead if this iterator is not completely
446                            //consumed, then the transaction cache will be stale
447             
448           } catch (ChampTransactionException e) {
449             LOGGER.warn("Failed transaction commit due to: " + e.getMessage());
450           } 
451                              
452               }        
453               next = null;
454               return false;
455             }
456
457             @Override
458             public ChampRelationship next() {
459               if (next == null) {
460                 throw new NoSuchElementException();
461               }
462                                 
463               return next;
464             }
465           };
466
467           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
468                 relIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
469         }
470
471         
472         @Override
473     public ChampObject doStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
474                 
475           ChampTransaction tx = null;
476           
477           try {
478             
479             // If we were not provided a transaction object then automatically open a transaction
480             // now.
481             tx = getOrCreateTransactionInstance(transaction);
482             
483             // Now, store the object that we were supplied.
484             final Vertex vertex = writeVertex(object, tx);
485             
486             // Only auto-commit this operation if we were NOT provided a transaction context, 
487             // otherwise it is the caller's responsibility to commit the transaction when it
488             // is appropriate to do so.
489             if(!transaction.isPresent()) {
490               tx.commit();
491             }
492                
493             // Marshal the resulting vertex into a ChampObject and return it to the caller.
494             return ChampObject.create()
495                                    .from(object)
496                                    .withKey(vertex.id())
497                                    .build();
498
499           } catch (ChampObjectNotExistsException e) {
500         
501         // Something went wrong.  If we auto-created the transaction, then try to roll it back
502             // now. If we were supplied a transaction context then it is the caller's responsibility 
503             // to decide whether or not to roll it back.
504             if(!transaction.isPresent()) {
505               tx.rollback();
506             }
507                 
508             // Rethrow the exception.
509             throw e;
510           }
511         }
512         
513         @Override
514         public ChampObject doReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
515
516           ChampTransaction tx = null;
517           
518           try {
519                   
520             // If we were not provided a transaction object then automatically open a transaction
521             // now.
522             tx = getOrCreateTransactionInstance(transaction);
523                 
524             final Vertex vertex = replaceVertex(object, tx);
525
526         // Only auto-commit this operation if we were NOT provided a transaction context, 
527         // otherwise it is the caller's responsibility to commit the transaction when it
528         // is appropriate to do so.
529         if(!transaction.isPresent()) {
530           tx.commit();
531         }
532                         
533         // Marshal the resulting vertex into a ChampObject and return it to the caller.
534         return ChampObject.create()
535                                .from(object)
536                                .withKey(vertex.id())
537                                .build();
538                         
539                 } catch (ChampObjectNotExistsException e) {
540                   
541                   // Something went wrong.  If we auto-created the transaction, then try to roll it back
542               // now. If we were supplied a transaction context then it is the caller's responsibility 
543               // to decide whether or not to roll it back.
544               if(!transaction.isPresent()) {
545                 tx.rollback();
546               }
547
548               // Rethrow the exception.
549               throw e;
550                 }
551         }
552
553         @Override
554         public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
555           
556                 if (isShutdown()) {
557                   throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
558                 }
559
560             // If we were not provided a transaction object then automatically open a transaction
561         // now.
562         ChampTransaction tx = getOrCreateTransactionInstance(transaction);
563                 
564             // Use the graph instance associated with our transaction.
565             Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
566               
567                 final Iterator<Vertex> vertex = graphInstance.vertices(key);
568
569                 if (!vertex.hasNext()) {
570                   
571                   // If we auto-created the transaction, then roll it back now, otherwise it
572                   // is up to the caller to make that determination.
573                   if(!transaction.isPresent()) {
574                         tx.rollback();
575                   }
576                   
577                   throw new ChampObjectNotExistsException();
578                 }
579
580                 // Remove the vertex.
581                 vertex.next().remove();
582                 
583                 // If we auto-created the transaction, then commit it now, otherwise it
584                 // is up to the caller to decide if and when to commit.
585                 if(!transaction.isPresent()) {
586                   tx.commit();
587                 }
588         }
589
590     @Override
591     public ChampRelationship doStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
592         throws ChampUnmarshallingException, 
593                ChampObjectNotExistsException, 
594                ChampRelationshipNotExistsException, 
595                ChampMarshallingException, 
596                ChampTransactionException  {
597       
598       // If we were not provided a transaction object then automatically open a transaction
599       // now.
600       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
601       
602       try {
603         
604         // Store the edge in the graph.
605         final Edge edge = writeEdge(relationship, tx);
606         
607         // Unmarshal the stored edge into a ChampRelationship object
608         ChampRelationship storedRelationship = getChampformer().unmarshallRelationship(edge);
609         
610         // If we auto-created the transaction, then commit it now, otherwise it
611         // is up to the caller to decide if and when to commit.
612         if(!transaction.isPresent()) {
613           tx.commit();
614         }
615         
616         // Finally, return the result to the caller.
617         return storedRelationship;
618         
619       } catch (ChampObjectNotExistsException | 
620                ChampRelationshipNotExistsException | 
621                ChampUnmarshallingException | 
622                ChampMarshallingException e) {
623         
624         // If we auto-create the transaction, then try to roll it back, otherwise
625         // it is up to the caller to decide when and if to do so.
626         if(!transaction.isPresent()) {
627           tx.rollback();
628         }
629         throw e;
630       }   
631     }
632     
633         
634         @Override
635         public ChampRelationship doReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
636                         throws ChampUnmarshallingException, 
637                                ChampRelationshipNotExistsException, 
638                                ChampMarshallingException, 
639                                ChampTransactionException  {
640
641       // If we were not provided a transaction object then automatically open a transaction
642       // now.
643       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
644       
645       try {
646         final Edge edge = replaceEdge(relationship, tx);
647
648                   ChampRelationship unmarshalledRelationship = getChampformer().unmarshallRelationship(edge);
649
650                   // If we auto-created the transaction, then commit it now, otherwise it
651                   // is up to the caller to decide if and when to commit.
652                   if(!transaction.isPresent()) {
653                           tx.commit();
654                   }
655
656                   return unmarshalledRelationship;
657                         
658       } catch ( ChampRelationshipNotExistsException | ChampUnmarshallingException | ChampMarshallingException e) {
659         
660         // it is up to the caller to decide when and if to do so.
661         if(!transaction.isPresent()) {
662           tx.rollback();
663         }
664
665         throw e;
666       }
667         }  
668
669         @Override
670     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException {
671           return queryRelationships(queryParams, Optional.empty());
672         }
673            
674         @Override
675         public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
676           
677           if (isShutdown()) {
678             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
679           }
680
681       // If we were not provided a transaction object then automatically open a transaction
682       // now.
683       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
684       
685       // Use the graph instance associated with our transaction.
686       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
687       
688           // If they provided the relationship key, do this the quick way rather than creating a traversal
689           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString())) {
690             try {
691               final Optional<ChampRelationship> relationship = retrieveRelationship(queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString()),
692                                                                                     Optional.of(tx));
693                         
694               if (relationship.isPresent()) {
695                 return Stream.of(relationship.get());
696               
697               } else {
698                 return Stream.empty();
699               }
700             } catch (ChampUnmarshallingException e) {
701               
702               LOGGER.warn("Failed to unmarshall relationship", e);
703               return Stream.empty();
704             }
705           }
706          
707           final GraphTraversal<Edge, Edge> query = graphInstance.traversal().E();
708
709           for (Entry<String, Object> filter : queryParams.entrySet()) {
710             if (filter.getKey().equals(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
711               continue; //Add the label last for performance reasons
712             } else {
713               query.has(filter.getKey(), filter.getValue());
714             }
715           }
716
717           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
718                   hasLabel(query, queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString()));
719           }
720
721           final Iterator<ChampRelationship> objIter = new Iterator<ChampRelationship> () {
722         
723             private ChampRelationship next;
724
725             @Override
726             public boolean hasNext() {
727               while (query.hasNext()) {
728                 try {
729                   next = getChampformer().unmarshallRelationship(query.next());
730                   return true;
731                 } catch (ChampUnmarshallingException e) {
732                   LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
733                 }                                       
734               }
735
736               // If we auto-created the transaction, then commit it now, otherwise it
737               // is up to the caller to decide if and when to commit.
738               if(!transaction.isPresent()) {
739                 try { 
740             tx.commit();  //Danger ahead if this iterator is not completely
741                           //consumed, then the transaction cache will be stale
742             
743           } catch (ChampTransactionException e) {
744             LOGGER.warn("Failed transaction commit due to " + e.getMessage());
745           } 
746                              
747               }
748                                         
749               next = null;
750               return false;
751             }
752
753             @Override
754             public ChampRelationship next() {
755               if (next == null) {
756                 throw new NoSuchElementException();
757               }
758                                 
759               return next;
760             }
761           };
762
763           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
764                     objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
765         }
766
767     @Override
768     public Optional<ChampRelationship> retrieveRelationship(Object key)
769             throws ChampUnmarshallingException, ChampTransactionException {
770       return retrieveRelationship(key, Optional.empty());
771     }
772     
773     @Override
774     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction)
775                         throws ChampUnmarshallingException, ChampTransactionException {
776           
777       if (isShutdown()) {
778         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
779       }
780
781       // If we were not provided a transaction object then automatically open a transaction
782       // now.
783       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
784       
785       // Use the graph instance associated with our transaction.
786       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
787       
788       final Iterator<Edge> edge = graphInstance.edges(key);
789       final Optional<ChampRelationship> optionalRelationship;
790
791       if (!edge.hasNext()) {
792         optionalRelationship = Optional.empty();
793       } else {
794         optionalRelationship = Optional.of(getChampformer().unmarshallRelationship(edge.next()));
795       }
796
797       // If we auto-created the transaction, then commit it now, otherwise it
798       // is up to the caller to decide if and when to commit.
799       if(!transaction.isPresent()) {
800         tx.commit();
801       }
802
803       return optionalRelationship;
804         }
805
806     @Override
807         public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
808
809       if (isShutdown()) {
810         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
811       }
812       
813       if (!relationship.getKey().isPresent()) {
814         throw new IllegalArgumentException("Key must be provided when deleting a relationship");
815       }
816
817       // If we were not provided a transaction object then automatically open a transaction
818       // now.
819       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
820       
821       // Use the graph instance associated with our transaction.
822       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
823       
824       final Iterator<Edge> edge = graphInstance.edges(relationship.getKey().get());
825                 
826       if (!edge.hasNext()) {
827         
828         // If we auto-created the transaction, then try to roll it back now, otherwise it
829         // is up to the caller to decide if and when to do so.
830         if(!transaction.isPresent()) {
831           tx.rollback();
832         }
833         
834         throw new ChampRelationshipNotExistsException();
835       }
836       
837           edge.next().remove();
838
839           // If we auto-created the transaction, then commit it now, otherwise it
840           // is up to the caller to decide if and when to commit.
841           if(!transaction.isPresent()) {
842             tx.commit();
843           }
844     }
845
846         
847     @Override
848     public ChampPartition doStorePartition(ChampPartition submittedPartition, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampTransactionException {
849                 
850           if (isShutdown()) {
851             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
852           }
853
854       // If we were not provided a transaction object then automatically open a transaction
855       // now.
856       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
857             
858           try {
859             final HashMap<ChampObject, ChampObject> objectsWithKeys = new HashMap<ChampObject, ChampObject> ();
860             final CreateChampPartitionable storedPartition = ChampPartition.create();
861
862             for (ChampObject champObject : submittedPartition.getChampObjects()) {
863               final Vertex vertex = writeVertex(champObject, tx);
864               objectsWithKeys.put(champObject, ChampObject.create()
865                                                                                                                 .from(champObject)
866                                                                                                                 .withKey(vertex.id())
867                                                                                                                 .build());
868             }
869
870             for (ChampRelationship champRelationship : submittedPartition.getChampRelationships()) {
871               
872               if (!objectsWithKeys.containsKey(champRelationship.getSource())) {
873                 final Vertex vertex = writeVertex(champRelationship.getSource(), tx);
874
875                 objectsWithKeys.put(champRelationship.getSource(), ChampObject.create()
876                                                                                                                      .from(champRelationship.getSource())
877                                                                                                                      .withKey(vertex.id())
878                                                                                                                      .build());
879               }
880
881               if (!objectsWithKeys.containsKey(champRelationship.getTarget())) {
882                 final Vertex vertex = writeVertex(champRelationship.getTarget(), tx);
883
884                 objectsWithKeys.put(champRelationship.getTarget(), ChampObject.create()
885                                                                                                                         .from(champRelationship.getTarget())
886                                                                                                                         .withKey(vertex.id())
887                                                                                                                         .build());
888               }
889
890               final ChampRelationship.Builder relWithKeysBuilder = new ChampRelationship.Builder(objectsWithKeys.get(champRelationship.getSource()),
891                                                                                                                                                                                          objectsWithKeys.get(champRelationship.getTarget()),
892                                                                                                                                                                                          champRelationship.getType());
893
894               if (champRelationship.getKey().isPresent()) {
895                 relWithKeysBuilder.key(champRelationship.getKey().get());
896               }
897                                 
898               relWithKeysBuilder.properties(champRelationship.getProperties());
899
900               final Edge edge = writeEdge(relWithKeysBuilder.build(), tx);
901
902               storedPartition.withRelationship(ChampRelationship.create()
903                                                                                                                                 .from(champRelationship)
904                                                                                                                                 .withKey(edge.id())
905                                                                                                                                 .build());
906             }
907
908             for (ChampObject object : objectsWithKeys.values()) {
909               storedPartition.withObject(object);
910             }
911
912             // If we auto-created the transaction, then commit it now, otherwise it
913         // is up to the caller to decide if and when to commit.
914         if(!transaction.isPresent()) {
915           tx.commit();
916         }
917             
918         return storedPartition.build();
919                         
920       } catch (ChampObjectNotExistsException | ChampMarshallingException e) {
921         
922         // If we auto-created the transaction, then try to roll it back now, otherwise it
923         // is up to the caller to decide if and when to do so.
924         if(!transaction.isPresent()) {
925           tx.rollback();
926         }  
927
928                 throw e;
929       }
930         }
931
932     @Override
933         public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
934       
935       if (isShutdown()) {
936         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
937       }
938
939       // If we were not provided a transaction object then automatically open a transaction
940       // now.
941       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
942       
943       // Use the graph instance associated with our transaction.
944       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
945       
946       for (ChampObject champObject : graph.getChampObjects()) {
947         try {
948           final Object vertexId = champObject.getKey().get();
949           final Iterator<Vertex> vertex = graphInstance.vertices(vertexId);
950         
951           if (vertex.hasNext()) {
952             vertex.next().remove();
953           }
954                 } catch (NoSuchElementException e) {
955                   
956                   // If we auto-created the transaction, then try to roll it back now, otherwise it
957               // is up to the caller to decide if and when to do so.
958               if(!transaction.isPresent()) {
959                 tx.rollback();
960               } 
961
962               throw new IllegalArgumentException("Must pass a key to delete an object");
963             }
964       }
965
966       for (ChampRelationship champRelationship : graph.getChampRelationships()) {
967         try {
968           final Iterator<Edge> edge = graphInstance.edges(champRelationship.getKey().get());
969                 
970           if (edge.hasNext()) {
971             edge.next().remove();
972           }
973         } catch (NoSuchElementException e) {
974           
975           // If we auto-created the transaction, then try to roll it back now, otherwise it
976           // is up to the caller to decide if and when to do so.
977           if(!transaction.isPresent()) {
978             tx.rollback();
979           }
980
981           throw new IllegalArgumentException("Must pass a key to delete a relationship");
982         }
983       }
984
985       // If we auto-created the transaction, then commit it now, otherwise it
986       // is up to the caller to decide if and when to commit.
987       if(!transaction.isPresent()) {
988         tx.commit();
989       }
990         }
991
992         @Override
993         public void shutdown() {
994
995                 if (isShutdown.compareAndSet(false, true)) {
996                   super.shutdown();
997                         try {
998                                 getGraph().close();
999                         } catch (Throwable t) {
1000                                 LOGGER.error("Exception while shutting down graph", t);
1001                         }
1002                 } else {
1003                         throw new IllegalStateException("Cannot call shutdown() after shutdown() was already initiated");
1004                 }
1005         }
1006
1007         @Override
1008         public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1009                 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
1010
1011                 if (getGraph().features().graph().variables().supportsVariables()) {
1012                         try {
1013                                 getGraph().variables().set("schema", getObjectMapper().writeValueAsBytes(schema));
1014                         } catch (JsonProcessingException e) {
1015                                 throw new RuntimeException(e);
1016                         }
1017                 } else {
1018                         super.storeSchema(schema);
1019                 }
1020         }
1021
1022         @Override
1023         public ChampSchema retrieveSchema() {
1024                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveSchema() after shutdown has been initiated");
1025
1026                 if (getGraph().features().graph().variables().supportsVariables()) {
1027                         final Optional<byte[]> schema = getGraph().variables().get("schema");
1028
1029                         if (schema.isPresent()) {
1030                                 try {
1031                                         return getObjectMapper().readValue(schema.get(), ChampSchema.class);
1032                                 } catch (IOException e) {
1033                                         throw new RuntimeException(e);
1034                                 }
1035                         }
1036                 }
1037
1038                 return super.retrieveSchema();
1039         }
1040
1041         @Override
1042         public void deleteSchema() {
1043                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteSchema() after shutdown has been initiated");
1044
1045                 if (getGraph().features().graph().variables().supportsVariables()) {
1046                         getGraph().variables().remove("schema");
1047                 } else {
1048                         super.deleteSchema();
1049                 }
1050         }
1051         
1052         public ChampTransaction getOrCreateTransactionInstance(Optional<ChampTransaction> transaction) {
1053           
1054           ChampTransaction tx = null;
1055           
1056       // If we were not provided a transaction object then automatically open a transaction
1057       // now.
1058       if(!transaction.isPresent()) {
1059         
1060           tx = new TinkerpopTransaction(getGraph());
1061           
1062       } else {
1063         tx = transaction.get();
1064       }
1065       
1066       return tx;
1067         }
1068 }