Updated champ-lib to use the correct logger
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / graph / impl / AbstractTinkerpopChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.graph.impl;
22
23 import java.io.IOException;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.NoSuchElementException;
30 import java.util.Optional;
31 import java.util.Set;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.stream.Stream;
36 import java.util.stream.StreamSupport;
37
38 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
39 import org.apache.tinkerpop.gremlin.structure.Direction;
40 import org.apache.tinkerpop.gremlin.structure.Edge;
41 import org.apache.tinkerpop.gremlin.structure.Graph;
42 import org.apache.tinkerpop.gremlin.structure.Property;
43 import org.apache.tinkerpop.gremlin.structure.Vertex;
44 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
45 import org.onap.aai.champcore.ChampCoreMsgs;
46 import org.onap.aai.champcore.ChampTransaction;
47 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
48 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
49 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
50 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
51 import org.onap.aai.champcore.exceptions.ChampTransactionException;
52 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
53 import org.onap.aai.champcore.model.ChampObject;
54 import org.onap.aai.champcore.model.ChampPartition;
55 import org.onap.aai.champcore.model.ChampRelationship;
56 import org.onap.aai.champcore.model.ChampSchema;
57 import org.onap.aai.champcore.model.fluent.partition.CreateChampPartitionable;
58 import org.onap.aai.champcore.transform.TinkerpopChampformer;
59 import org.onap.aai.cl.api.Logger;
60 import org.onap.aai.cl.eelf.LoggerFactory;
61
62 import com.fasterxml.jackson.core.JsonProcessingException;
63 import com.fasterxml.jackson.databind.ObjectMapper;
64
65 public abstract class AbstractTinkerpopChampGraph extends AbstractValidatingChampGraph {
66         
67         private static final Logger LOGGER = LoggerFactory.getInstance().getLogger(AbstractTinkerpopChampGraph.class);
68         private static final TinkerpopChampformer TINKERPOP_CHAMPFORMER = new TinkerpopChampformer();
69         private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
70
71         private volatile AtomicBoolean isShutdown;
72
73         protected AbstractTinkerpopChampGraph(Map<String, Object> properties) {
74           super(properties);
75           
76           isShutdown = new AtomicBoolean(false);
77       Runtime.getRuntime().addShutdownHook(shutdownHook);
78         }
79         
80         private static final TinkerpopChampformer getChampformer() {
81                 return TINKERPOP_CHAMPFORMER;
82         }
83
84         private static final ObjectMapper getObjectMapper() {
85                 return OBJECT_MAPPER;
86         }
87
88         public abstract GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type);
89         
90         public ChampTransaction openTransaction() {
91           
92           return new TinkerpopTransaction(getGraph());
93         }         
94         
95         private Vertex writeVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
96                 final Vertex vertex;
97                 
98                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
99                 
100                 if (object.getKey().isPresent()) {
101                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
102
103                         if (vertexIter.hasNext()) {
104                                 vertex = vertexIter.next();
105                         } else throw new ChampObjectNotExistsException();
106                 } else {
107                         vertex = graphInstance.addVertex(object.getType());
108                 }
109
110                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
111
112                         if (property.getValue() instanceof List) {
113                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
114                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
115                                 }
116                         } else if (property.getValue() instanceof Set) {
117                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
118                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
119                                 }
120                         } else {
121                                 vertex.property(property.getKey(), property.getValue());
122                         }
123                 }
124
125                 return vertex;
126         }
127         
128         private Vertex replaceVertex(ChampObject object, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampMarshallingException {
129                 Vertex vertex;
130                 
131                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
132                 
133                 if (object.getKey().isPresent()) {
134                         final Iterator<Vertex> vertexIter = graphInstance.vertices(object.getKey().get());
135
136                         if (vertexIter.hasNext()) {
137                                 vertex = vertexIter.next();
138                         } else throw new ChampObjectNotExistsException();
139                 } else {
140                         throw new ChampObjectNotExistsException();
141                 }
142
143                 //clear all the existing properties
144                 Iterator<VertexProperty<Object>> it = vertex.properties();
145                 while (it.hasNext()) {
146                         it.next().remove();
147                 }
148                 
149                 for (Entry<String, Object> property : object.getProperties().entrySet()) {
150
151                         if (property.getValue() instanceof List) {
152                                 for (Object subPropertyValue : (List<?>) property.getValue()) {
153                                         vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
154                                 }
155                         } else if (property.getValue() instanceof Set) {
156                                 for (Object subPropertyValue : (Set<?>) property.getValue()) {
157                                         vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
158                                 }
159                         } else {
160                                 vertex.property(property.getKey(), property.getValue());                                
161                         }
162                 }
163
164                 return vertex;
165         }
166
167         private Edge writeEdge(ChampRelationship relationship, ChampTransaction transaction) throws ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampMarshallingException {
168
169                 final Vertex source = writeVertex(relationship.getSource(), transaction);
170                 final Vertex target = writeVertex(relationship.getTarget(), transaction);
171                 final Edge edge;
172
173                 Graph graphInstance = ((TinkerpopTransaction)transaction).getGraphInstance();
174
175                 if (relationship.getKey().isPresent()) {
176                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
177
178                         if (edgeIter.hasNext()) {
179                                 edge = edgeIter.next();
180                         } else throw new ChampRelationshipNotExistsException();
181                 } else {
182                         edge = source.addEdge(relationship.getType(), target);
183                 }
184
185                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
186                         edge.property(property.getKey(), property.getValue());
187                 }
188
189                 return edge;
190         }
191         
192         private Edge replaceEdge(ChampRelationship relationship, ChampTransaction tx) throws  ChampRelationshipNotExistsException, ChampMarshallingException {
193                 final Edge edge;
194                 Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
195                 
196                 if(!relationship.getSource().getKey().isPresent() || !relationship.getTarget().getKey().isPresent()){
197                         throw new IllegalArgumentException("Invalid source/target");
198                 }
199                 
200                 if (relationship.getKey().isPresent()) {
201                         final Iterator<Edge> edgeIter = graphInstance.edges(relationship.getKey().get());
202
203
204                         if (edgeIter.hasNext()) {
205                                 edge = edgeIter.next();
206                                 //validate if the source/target are the same as before. Throw error if not the same
207                                 if (!edge.outVertex().id().equals(relationship.getSource().getKey().get())
208                                                 || !edge.inVertex().id().equals(relationship.getTarget().getKey().get())) {
209                                         throw new IllegalArgumentException("source/target can't be updated");
210                                 }
211
212                         } else throw new ChampRelationshipNotExistsException();
213                 } else {
214                         throw new ChampRelationshipNotExistsException();
215                 }
216                 
217                 // clear all the existing properties
218                 Iterator<Property<Object>> it = edge.properties();
219                 while (it.hasNext()) {
220                         it.next().remove();
221                 }
222                                 
223                 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
224                         edge.property(property.getKey(), property.getValue());
225                 }
226
227                 return edge;
228         }
229
230
231         protected abstract Graph getGraph();
232
233         
234
235         private Thread shutdownHook = new Thread() {
236                 @Override
237                 public void run() {
238                         try {
239                                 shutdown();
240                         } catch (IllegalStateException e) {
241                                 //Suppress, because shutdown() has already been called
242                         }
243                 }
244         };
245
246         protected boolean isShutdown() {
247                 return isShutdown.get();
248         }
249
250     @Override
251         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException {
252       return queryObjects(queryParams, Optional.empty());
253     }
254
255         
256         @Override
257         public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
258           
259           if (isShutdown()) {
260             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
261           }
262         
263           // If we were not provided a transaction object then automatically open a transaction
264       // now.
265       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
266       
267       // Use the graph instance associated with our transaction.
268       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
269       
270       //If they provided the object key, do this the quick way rather than creating a traversal
271       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString())) {
272         
273         try {
274           final Optional<ChampObject> object = 
275               retrieveObject(queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString()),
276                              transaction);
277                         
278           if (object.isPresent()) {
279             return Stream.of(object.get());
280           } else {
281             return Stream.empty();
282           }
283         } catch (ChampUnmarshallingException e) {
284           LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
285               "Failed to unmarshall object. " + e.getMessage());
286           return Stream.empty();
287         }
288       }
289
290       final GraphTraversal<Vertex, Vertex> query = graphInstance.traversal().V();
291
292       for (Entry<String, Object> filter : queryParams.entrySet()) {      
293         if (filter.getKey().equals(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
294           continue; //For performance reasons, the label is the last thing to be added
295         } else {
296           query.has(filter.getKey(), filter.getValue());
297         }
298       }
299
300       if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
301           hasLabel(query, queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString()));
302       }
303
304       final Iterator<ChampObject> objIter = new Iterator<ChampObject> () {
305         
306         private ChampObject next;
307
308
309           @Override
310           public boolean hasNext() {
311             while (query.hasNext()) {
312               try {
313                 next = getChampformer().unmarshallObject(query.next());
314                 return true;
315               } catch (ChampUnmarshallingException e) {
316                 LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
317                     "Failed to unmarshall tinkerpop vertex during query, returning partial results" + e.getMessage());
318               }                                 
319             }
320
321             // If we auto-created the transaction, then commit it now, otherwise it is up to the
322             // caller to decide when and if to do the commit.
323             if(!transaction.isPresent()) {
324               try {
325                 tx.commit(); //Danger ahead if this iterator is not completely consumed
326                              //then the transaction cache will hold stale values
327               } catch (ChampTransactionException e) {
328                 LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
329                     "Failed transaction commit due to: " + e.getMessage());
330               } 
331                            
332             }
333
334             next = null;
335             return false;
336           }
337
338           @Override
339           public ChampObject next() {
340             if (next == null) {
341               throw new NoSuchElementException();
342             }
343                                 
344                         return next;
345                   }
346       };
347
348       return StreamSupport.stream(Spliterators.spliteratorUnknownSize(objIter, 
349                                                                       Spliterator.ORDERED | Spliterator.NONNULL), 
350                                                                       false);
351     }
352
353     @Override
354     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException {
355       return retrieveObject(key, Optional.empty());
356     }
357     
358         @Override
359         public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException {
360           
361           if (isShutdown()) {
362             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
363           }
364           
365           // If we were not provided a transaction object then automatically open a transaction
366       // now.
367       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
368           
369           // Use the graph instance associated with our transaction.
370           Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
371           
372           final Iterator<Vertex> vertices = graphInstance.vertices(key);
373           final Optional<ChampObject> optionalObject;
374
375           if (!vertices.hasNext()) {
376             optionalObject = Optional.empty();
377             
378           } else {
379             optionalObject = Optional.of(getChampformer().unmarshallObject(vertices.next()));
380           }
381           
382           // If we auto-created the transaction, then commit it now, otherwise it is up to the
383           // caller to decide when and if to do the commit.
384           if(!transaction.isPresent()) {
385             tx.commit();
386           }
387           
388           return optionalObject;
389         }
390
391     @Override
392     public Stream<ChampRelationship> retrieveRelationships(ChampObject source) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
393       return retrieveRelationships(source, Optional.empty());
394     }
395     
396         @Override
397         public Stream<ChampRelationship> retrieveRelationships(ChampObject source, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException {
398             
399           if (isShutdown()) {
400             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
401           }
402                 
403           // If we were not provided a transaction object then automatically open a transaction
404       // now.
405       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
406       
407       // Use the graph instance associated with our transaction.
408       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
409       
410           final Vertex sourceVertex;
411
412           try {
413             sourceVertex = graphInstance.vertices(source.getKey().get()).next();
414           
415           } catch (NoSuchElementException e) {  
416             
417             // If we auto-created the transaction, then try to roll it back now, otherwise it is 
418             // up to the caller to decide when and if to do so.
419             if(!transaction.isPresent()) {
420                         tx.rollback();
421             }
422             
423                 throw new ChampObjectNotExistsException();
424       }
425
426           final Iterator<Edge> edges = sourceVertex.edges(Direction.BOTH);
427           final Iterator<ChampRelationship> relIter = new Iterator<ChampRelationship> () {
428
429             private ChampRelationship next;
430
431             @Override
432             public boolean hasNext() {
433               while (edges.hasNext()) {
434                 try {
435                   next = getChampformer().unmarshallRelationship(edges.next());
436                   return true;
437                 } catch (ChampUnmarshallingException e) {
438                   LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
439                       "Failed to unmarshall tinkerpop edge during query, returning partial results" + e.getMessage());
440                 }                                       
441               }
442
443               // If we auto-created the transaction, then commit it now, otherwise it is up to the
444               // caller to decide when and if to do the commit.
445               if(!transaction.isPresent()) {
446                 try {
447             tx.commit();   //Danger ahead if this iterator is not completely
448                            //consumed, then the transaction cache will be stale
449             
450           } catch (ChampTransactionException e) {
451             LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
452                 "Failed transaction commit due to: " + e.getMessage());
453           } 
454                              
455               }        
456               next = null;
457               return false;
458             }
459
460             @Override
461             public ChampRelationship next() {
462               if (next == null) {
463                 throw new NoSuchElementException();
464               }
465                                 
466               return next;
467             }
468           };
469
470           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
471                 relIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
472         }
473
474         
475         @Override
476     public ChampObject doStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
477                 
478           ChampTransaction tx = null;
479           
480           try {
481             
482             // If we were not provided a transaction object then automatically open a transaction
483             // now.
484             tx = getOrCreateTransactionInstance(transaction);
485             
486             // Now, store the object that we were supplied.
487             final Vertex vertex = writeVertex(object, tx);
488             
489             // Only auto-commit this operation if we were NOT provided a transaction context, 
490             // otherwise it is the caller's responsibility to commit the transaction when it
491             // is appropriate to do so.
492             if(!transaction.isPresent()) {
493               tx.commit();
494             }
495                
496             // Marshal the resulting vertex into a ChampObject and return it to the caller.
497             return ChampObject.create()
498                                    .from(object)
499                                    .withKey(vertex.id())
500                                    .build();
501
502           } catch (ChampObjectNotExistsException e) {
503         
504         // Something went wrong.  If we auto-created the transaction, then try to roll it back
505             // now. If we were supplied a transaction context then it is the caller's responsibility 
506             // to decide whether or not to roll it back.
507             if(!transaction.isPresent()) {
508               tx.rollback();
509             }
510                 
511             // Rethrow the exception.
512             throw e;
513           }
514         }
515         
516         @Override
517         public ChampObject doReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
518
519           ChampTransaction tx = null;
520           
521           try {
522                   
523             // If we were not provided a transaction object then automatically open a transaction
524             // now.
525             tx = getOrCreateTransactionInstance(transaction);
526                 
527             final Vertex vertex = replaceVertex(object, tx);
528
529         // Only auto-commit this operation if we were NOT provided a transaction context, 
530         // otherwise it is the caller's responsibility to commit the transaction when it
531         // is appropriate to do so.
532         if(!transaction.isPresent()) {
533           tx.commit();
534         }
535                         
536         // Marshal the resulting vertex into a ChampObject and return it to the caller.
537         return ChampObject.create()
538                                .from(object)
539                                .withKey(vertex.id())
540                                .build();
541                         
542                 } catch (ChampObjectNotExistsException e) {
543                   
544                   // Something went wrong.  If we auto-created the transaction, then try to roll it back
545               // now. If we were supplied a transaction context then it is the caller's responsibility 
546               // to decide whether or not to roll it back.
547               if(!transaction.isPresent()) {
548                 tx.rollback();
549               }
550
551               // Rethrow the exception.
552               throw e;
553                 }
554         }
555
556         @Override
557         public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
558           
559                 if (isShutdown()) {
560                   throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
561                 }
562
563             // If we were not provided a transaction object then automatically open a transaction
564         // now.
565         ChampTransaction tx = getOrCreateTransactionInstance(transaction);
566                 
567             // Use the graph instance associated with our transaction.
568             Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
569               
570                 final Iterator<Vertex> vertex = graphInstance.vertices(key);
571
572                 if (!vertex.hasNext()) {
573                   
574                   // If we auto-created the transaction, then roll it back now, otherwise it
575                   // is up to the caller to make that determination.
576                   if(!transaction.isPresent()) {
577                         tx.rollback();
578                   }
579                   
580                   throw new ChampObjectNotExistsException();
581                 }
582
583                 // Remove the vertex.
584                 vertex.next().remove();
585                 
586                 // If we auto-created the transaction, then commit it now, otherwise it
587                 // is up to the caller to decide if and when to commit.
588                 if(!transaction.isPresent()) {
589                   tx.commit();
590                 }
591         }
592
593     @Override
594     public ChampRelationship doStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
595         throws ChampUnmarshallingException, 
596                ChampObjectNotExistsException, 
597                ChampRelationshipNotExistsException, 
598                ChampMarshallingException, 
599                ChampTransactionException  {
600       
601       // If we were not provided a transaction object then automatically open a transaction
602       // now.
603       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
604       
605       try {
606         
607         // Store the edge in the graph.
608         final Edge edge = writeEdge(relationship, tx);
609         
610         // Unmarshal the stored edge into a ChampRelationship object
611         ChampRelationship storedRelationship = getChampformer().unmarshallRelationship(edge);
612         
613         // If we auto-created the transaction, then commit it now, otherwise it
614         // is up to the caller to decide if and when to commit.
615         if(!transaction.isPresent()) {
616           tx.commit();
617         }
618         
619         // Finally, return the result to the caller.
620         return storedRelationship;
621         
622       } catch (ChampObjectNotExistsException | 
623                ChampRelationshipNotExistsException | 
624                ChampUnmarshallingException | 
625                ChampMarshallingException e) {
626         
627         // If we auto-create the transaction, then try to roll it back, otherwise
628         // it is up to the caller to decide when and if to do so.
629         if(!transaction.isPresent()) {
630           tx.rollback();
631         }
632         throw e;
633       }   
634     }
635     
636         
637         @Override
638         public ChampRelationship doReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
639                         throws ChampUnmarshallingException, 
640                                ChampRelationshipNotExistsException, 
641                                ChampMarshallingException, 
642                                ChampTransactionException  {
643
644       // If we were not provided a transaction object then automatically open a transaction
645       // now.
646       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
647       
648       try {
649         final Edge edge = replaceEdge(relationship, tx);
650
651                   ChampRelationship unmarshalledRelationship = getChampformer().unmarshallRelationship(edge);
652
653                   // If we auto-created the transaction, then commit it now, otherwise it
654                   // is up to the caller to decide if and when to commit.
655                   if(!transaction.isPresent()) {
656                           tx.commit();
657                   }
658
659                   return unmarshalledRelationship;
660                         
661       } catch ( ChampRelationshipNotExistsException | ChampUnmarshallingException | ChampMarshallingException e) {
662         
663         // it is up to the caller to decide when and if to do so.
664         if(!transaction.isPresent()) {
665           tx.rollback();
666         }
667
668         throw e;
669       }
670         }  
671
672         @Override
673     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException {
674           return queryRelationships(queryParams, Optional.empty());
675         }
676            
677         @Override
678         public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException {
679           
680           if (isShutdown()) {
681             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
682           }
683
684       // If we were not provided a transaction object then automatically open a transaction
685       // now.
686       final ChampTransaction tx = getOrCreateTransactionInstance(transaction);
687       
688       // Use the graph instance associated with our transaction.
689       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
690       
691           // If they provided the relationship key, do this the quick way rather than creating a traversal
692           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString())) {
693             try {
694               final Optional<ChampRelationship> relationship = retrieveRelationship(queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString()),
695                                                                                     Optional.of(tx));
696                         
697               if (relationship.isPresent()) {
698                 return Stream.of(relationship.get());
699               
700               } else {
701                 return Stream.empty();
702               }
703             } catch (ChampUnmarshallingException e) {
704               
705               LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN, 
706                   "Failed to unmarshall relationship" + e.getMessage());
707               return Stream.empty();
708             }
709           }
710          
711           final GraphTraversal<Edge, Edge> query = graphInstance.traversal().E();
712
713           for (Entry<String, Object> filter : queryParams.entrySet()) {
714             if (filter.getKey().equals(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
715               continue; //Add the label last for performance reasons
716             } else {
717               query.has(filter.getKey(), filter.getValue());
718             }
719           }
720
721           if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
722                   hasLabel(query, queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString()));
723           }
724
725           final Iterator<ChampRelationship> objIter = new Iterator<ChampRelationship> () {
726         
727             private ChampRelationship next;
728
729             @Override
730             public boolean hasNext() {
731               while (query.hasNext()) {
732                 try {
733                   next = getChampformer().unmarshallRelationship(query.next());
734                   return true;
735                 } catch (ChampUnmarshallingException e) {
736                   LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN,
737                       "Failed to unmarshall tinkerpop vertex during query, returning partial results" + e.getMessage());
738                 }                                       
739               }
740
741               // If we auto-created the transaction, then commit it now, otherwise it
742               // is up to the caller to decide if and when to commit.
743               if(!transaction.isPresent()) {
744                 try { 
745             tx.commit();  //Danger ahead if this iterator is not completely
746                           //consumed, then the transaction cache will be stale
747             
748           } catch (ChampTransactionException e) {
749             LOGGER.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_WARN,
750                 "Failed transaction commit due to " + e.getMessage());
751           } 
752                              
753               }
754                                         
755               next = null;
756               return false;
757             }
758
759             @Override
760             public ChampRelationship next() {
761               if (next == null) {
762                 throw new NoSuchElementException();
763               }
764                                 
765               return next;
766             }
767           };
768
769           return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
770                     objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
771         }
772
773     @Override
774     public Optional<ChampRelationship> retrieveRelationship(Object key)
775             throws ChampUnmarshallingException, ChampTransactionException {
776       return retrieveRelationship(key, Optional.empty());
777     }
778     
779     @Override
780     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction)
781                         throws ChampUnmarshallingException, ChampTransactionException {
782           
783       if (isShutdown()) {
784         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
785       }
786
787       // If we were not provided a transaction object then automatically open a transaction
788       // now.
789       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
790       
791       // Use the graph instance associated with our transaction.
792       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
793       
794       final Iterator<Edge> edge = graphInstance.edges(key);
795       final Optional<ChampRelationship> optionalRelationship;
796
797       if (!edge.hasNext()) {
798         optionalRelationship = Optional.empty();
799       } else {
800         optionalRelationship = Optional.of(getChampformer().unmarshallRelationship(edge.next()));
801       }
802
803       // If we auto-created the transaction, then commit it now, otherwise it
804       // is up to the caller to decide if and when to commit.
805       if(!transaction.isPresent()) {
806         tx.commit();
807       }
808
809       return optionalRelationship;
810         }
811
812     @Override
813         public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
814
815       if (isShutdown()) {
816         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
817       }
818       
819       if (!relationship.getKey().isPresent()) {
820         throw new IllegalArgumentException("Key must be provided when deleting a relationship");
821       }
822
823       // If we were not provided a transaction object then automatically open a transaction
824       // now.
825       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
826       
827       // Use the graph instance associated with our transaction.
828       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
829       
830       final Iterator<Edge> edge = graphInstance.edges(relationship.getKey().get());
831                 
832       if (!edge.hasNext()) {
833         
834         // If we auto-created the transaction, then try to roll it back now, otherwise it
835         // is up to the caller to decide if and when to do so.
836         if(!transaction.isPresent()) {
837           tx.rollback();
838         }
839         
840         throw new ChampRelationshipNotExistsException();
841       }
842       
843           edge.next().remove();
844
845           // If we auto-created the transaction, then commit it now, otherwise it
846           // is up to the caller to decide if and when to commit.
847           if(!transaction.isPresent()) {
848             tx.commit();
849           }
850     }
851
852         
853     @Override
854     public ChampPartition doStorePartition(ChampPartition submittedPartition, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampTransactionException {
855                 
856           if (isShutdown()) {
857             throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
858           }
859
860       // If we were not provided a transaction object then automatically open a transaction
861       // now.
862       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
863             
864           try {
865             final HashMap<ChampObject, ChampObject> objectsWithKeys = new HashMap<ChampObject, ChampObject> ();
866             final CreateChampPartitionable storedPartition = ChampPartition.create();
867
868             for (ChampObject champObject : submittedPartition.getChampObjects()) {
869               final Vertex vertex = writeVertex(champObject, tx);
870               objectsWithKeys.put(champObject, ChampObject.create()
871                                                                                                                 .from(champObject)
872                                                                                                                 .withKey(vertex.id())
873                                                                                                                 .build());
874             }
875
876             for (ChampRelationship champRelationship : submittedPartition.getChampRelationships()) {
877               
878               if (!objectsWithKeys.containsKey(champRelationship.getSource())) {
879                 final Vertex vertex = writeVertex(champRelationship.getSource(), tx);
880
881                 objectsWithKeys.put(champRelationship.getSource(), ChampObject.create()
882                                                                                                                      .from(champRelationship.getSource())
883                                                                                                                      .withKey(vertex.id())
884                                                                                                                      .build());
885               }
886
887               if (!objectsWithKeys.containsKey(champRelationship.getTarget())) {
888                 final Vertex vertex = writeVertex(champRelationship.getTarget(), tx);
889
890                 objectsWithKeys.put(champRelationship.getTarget(), ChampObject.create()
891                                                                                                                         .from(champRelationship.getTarget())
892                                                                                                                         .withKey(vertex.id())
893                                                                                                                         .build());
894               }
895
896               final ChampRelationship.Builder relWithKeysBuilder = new ChampRelationship.Builder(objectsWithKeys.get(champRelationship.getSource()),
897                                                                                                                                                                                          objectsWithKeys.get(champRelationship.getTarget()),
898                                                                                                                                                                                          champRelationship.getType());
899
900               if (champRelationship.getKey().isPresent()) {
901                 relWithKeysBuilder.key(champRelationship.getKey().get());
902               }
903                                 
904               relWithKeysBuilder.properties(champRelationship.getProperties());
905
906               final Edge edge = writeEdge(relWithKeysBuilder.build(), tx);
907
908               storedPartition.withRelationship(ChampRelationship.create()
909                                                                                                                                 .from(champRelationship)
910                                                                                                                                 .withKey(edge.id())
911                                                                                                                                 .build());
912             }
913
914             for (ChampObject object : objectsWithKeys.values()) {
915               storedPartition.withObject(object);
916             }
917
918             // If we auto-created the transaction, then commit it now, otherwise it
919         // is up to the caller to decide if and when to commit.
920         if(!transaction.isPresent()) {
921           tx.commit();
922         }
923             
924         return storedPartition.build();
925                         
926       } catch (ChampObjectNotExistsException | ChampMarshallingException e) {
927         
928         // If we auto-created the transaction, then try to roll it back now, otherwise it
929         // is up to the caller to decide if and when to do so.
930         if(!transaction.isPresent()) {
931           tx.rollback();
932         }  
933
934                 throw e;
935       }
936         }
937
938     @Override
939         public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
940       
941       if (isShutdown()) {
942         throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
943       }
944
945       // If we were not provided a transaction object then automatically open a transaction
946       // now.
947       ChampTransaction tx = getOrCreateTransactionInstance(transaction);
948       
949       // Use the graph instance associated with our transaction.
950       Graph graphInstance = ((TinkerpopTransaction)tx).getGraphInstance();
951       
952       for (ChampObject champObject : graph.getChampObjects()) {
953         try {
954           final Object vertexId = champObject.getKey().get();
955           final Iterator<Vertex> vertex = graphInstance.vertices(vertexId);
956         
957           if (vertex.hasNext()) {
958             vertex.next().remove();
959           }
960                 } catch (NoSuchElementException e) {
961                   
962                   // If we auto-created the transaction, then try to roll it back now, otherwise it
963               // is up to the caller to decide if and when to do so.
964               if(!transaction.isPresent()) {
965                 tx.rollback();
966               } 
967
968               throw new IllegalArgumentException("Must pass a key to delete an object");
969             }
970       }
971
972       for (ChampRelationship champRelationship : graph.getChampRelationships()) {
973         try {
974           final Iterator<Edge> edge = graphInstance.edges(champRelationship.getKey().get());
975                 
976           if (edge.hasNext()) {
977             edge.next().remove();
978           }
979         } catch (NoSuchElementException e) {
980           
981           // If we auto-created the transaction, then try to roll it back now, otherwise it
982           // is up to the caller to decide if and when to do so.
983           if(!transaction.isPresent()) {
984             tx.rollback();
985           }
986
987           throw new IllegalArgumentException("Must pass a key to delete a relationship");
988         }
989       }
990
991       // If we auto-created the transaction, then commit it now, otherwise it
992       // is up to the caller to decide if and when to commit.
993       if(!transaction.isPresent()) {
994         tx.commit();
995       }
996         }
997
998         @Override
999         public void shutdown() {
1000
1001                 if (isShutdown.compareAndSet(false, true)) {
1002                   super.shutdown();
1003                         try {
1004                                 getGraph().close();
1005                         } catch (Throwable t) {
1006                                 LOGGER.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_TINKERPOP_CHAMP_GRAPH_ERROR,
1007                                     "Exception while shutting down graph" + t.getMessage());
1008                         }
1009                 } else {
1010                         throw new IllegalStateException("Cannot call shutdown() after shutdown() was already initiated");
1011                 }
1012         }
1013
1014         @Override
1015         public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1016                 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
1017
1018                 if (getGraph().features().graph().variables().supportsVariables()) {
1019                         try {
1020                                 getGraph().variables().set("schema", getObjectMapper().writeValueAsBytes(schema));
1021                         } catch (JsonProcessingException e) {
1022                                 throw new RuntimeException(e);
1023                         }
1024                 } else {
1025                         super.storeSchema(schema);
1026                 }
1027         }
1028
1029         @Override
1030         public ChampSchema retrieveSchema() {
1031                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveSchema() after shutdown has been initiated");
1032
1033                 if (getGraph().features().graph().variables().supportsVariables()) {
1034                         final Optional<byte[]> schema = getGraph().variables().get("schema");
1035
1036                         if (schema.isPresent()) {
1037                                 try {
1038                                         return getObjectMapper().readValue(schema.get(), ChampSchema.class);
1039                                 } catch (IOException e) {
1040                                         throw new RuntimeException(e);
1041                                 }
1042                         }
1043                 }
1044
1045                 return super.retrieveSchema();
1046         }
1047
1048         @Override
1049         public void deleteSchema() {
1050                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteSchema() after shutdown has been initiated");
1051
1052                 if (getGraph().features().graph().variables().supportsVariables()) {
1053                         getGraph().variables().remove("schema");
1054                 } else {
1055                         super.deleteSchema();
1056                 }
1057         }
1058         
1059         public ChampTransaction getOrCreateTransactionInstance(Optional<ChampTransaction> transaction) {
1060           
1061           ChampTransaction tx = null;
1062           
1063       // If we were not provided a transaction object then automatically open a transaction
1064       // now.
1065       if(!transaction.isPresent()) {
1066         
1067           tx = new TinkerpopTransaction(getGraph());
1068           
1069       } else {
1070         tx = transaction.get();
1071       }
1072       
1073       return tx;
1074         }
1075 }