Create vertex and edge indicies on startup
[aai/champ.git] / champ-lib / champ-janus / src / main / java / org / onap / aai / champjanus / graph / impl / JanusChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champjanus.graph.impl;
22
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.janusgraph.core.schema.JanusGraphIndex;
28 import org.janusgraph.core.schema.JanusGraphManagement;
29 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
35 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
36 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
37 import org.onap.aai.champcore.model.*;
38 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
39 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.security.SecureRandom;
44 import java.time.temporal.ChronoUnit;
45 import java.util.*;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
49
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51   private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52   private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53   private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
54   private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
55   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
56   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 45;
57
58   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
59
60     @Override
61     public boolean canDeleteObjectIndices() {
62       return false;
63     }
64
65     @Override
66     public boolean canDeleteRelationshipIndices() {
67       return false;
68     }
69   };
70
71   private JanusGraph graph;
72   private final JanusGraphFactory.Builder janusGraphBuilder;
73
74   public JanusChampGraphImpl(Builder builder) {
75     super(builder.graphConfiguration);
76     janusGraphBuilder = JanusGraphFactory.build();
77
78     for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
79       janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
80     }
81     
82     janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
83
84     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
85
86     if ("cassandra".equals(storageBackend) ||
87         "cassandrathrift".equals(storageBackend) ||
88         "astyanax".equals(storageBackend) ||
89         "embeddedcassandra".equals(storageBackend)) {
90         
91         janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
92     } else if ("hbase".equals(storageBackend)) {
93       janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
94     } else if ("berkleyje".equals(storageBackend)) {
95       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
96     } else if ("inmemory".equals(storageBackend)) {
97     } else {
98       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
99     }
100     
101     try {
102       openGraph();
103     }
104     catch (Exception ex) {
105       // Swallow exception.  Cassandra may not be reachable.  Will retry next time we need to use the graph.
106       LOGGER.error("Error opening graph: " + ex.getMessage());
107       return;
108     }
109     
110     LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
111   }
112
113   public static class Builder {
114     private final String graphName;
115
116     private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
117
118     public Builder(String graphName) {
119       this.graphName = graphName;
120     }
121     
122     public Builder(String graphName, Map<String, Object> properties) {
123         this.graphName = graphName;
124         properties(properties);
125     }
126
127     public Builder properties(Map<String, Object> properties) {
128       if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
129         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
130             + " in initial configuration - this path is used"
131             + " to specify graph names");
132       }
133
134       this.graphConfiguration.putAll(properties);
135       return this;
136     }
137
138     public Builder property(String path, Object value) {
139       if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
140         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
141             + " in initial configuration - this path is used"
142             + " to specify graph names");
143       }
144       graphConfiguration.put(path, value);
145       return this;
146     }
147
148     public JanusChampGraphImpl build() {
149       return new JanusChampGraphImpl(this);
150     }
151   }
152
153   @Override
154   protected JanusGraph getGraph() {
155     if (graph == null) {
156       openGraph();
157     }
158     return graph;
159   }
160
161  
162   @Override
163   protected ChampSchemaEnforcer getSchemaEnforcer() {
164     return SCHEMA_ENFORCER;
165   }
166
167   @Override
168   public void executeStoreObjectIndex(ChampObjectIndex index) {
169     if (isShutdown()) {
170       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
171     }
172
173     final JanusGraph graph = getGraph();
174     final JanusGraphManagement createIndexMgmt = graph.openManagement();
175
176     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
177       createIndexMgmt.rollback();
178       LOGGER.info("Index " + index.getName() + " already exists");
179       return; //Ignore, index already exists
180     }
181
182     LOGGER.info("Create index " + index.getName());
183     IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
184     for (ChampField field : index.getFields()) {
185       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
186       ib = ib.addKey(pk);
187     }
188     ib.buildCompositeIndex();
189
190     createIndexMgmt.commit();
191     graph.tx().commit();
192
193     awaitIndexCreation(index.getName());
194   }
195
196   @Override
197   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
198     if (isShutdown()) {
199       throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
200     }
201
202     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
203     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
204
205     if (index == null) {
206       return Optional.empty();
207     }
208     if (index.getIndexedElement() != JanusGraphVertex.class) {
209       return Optional.empty();
210     }
211
212     List<String> fieldNames = new ArrayList<String>();
213     for (int i = 0; i < index.getFieldKeys().length; i++) {
214       fieldNames.add(index.getFieldKeys()[i].name());
215     }
216     
217     return Optional.of(ChampObjectIndex.create()
218         .ofName(indexName)
219         .onType(ChampObject.ReservedTypes.ANY.toString())
220         .forFields(fieldNames)
221         .build());
222   }
223
224   @Override
225   public Stream<ChampObjectIndex> retrieveObjectIndices() {
226     if (isShutdown()) {
227       throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
228     }
229
230     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
231     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
232
233     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
234
235       private ChampObjectIndex next;
236
237       @Override
238       public boolean hasNext() {
239         if (indices.hasNext()) {
240           final JanusGraphIndex index = indices.next();
241
242           List<String> fieldNames = new ArrayList<String>();
243           for (int i = 0; i < index.getFieldKeys().length; i++) {
244             fieldNames.add(index.getFieldKeys()[i].name());
245           }
246           
247           next = ChampObjectIndex.create()
248               .ofName(index.name())
249               .onType(ChampObject.ReservedTypes.ANY.toString())
250               .forFields(fieldNames)
251               .build();
252           return true;
253         }
254
255         next = null;
256         return false;
257       }
258
259       @Override
260       public ChampObjectIndex next() {
261         if (next == null) {
262           throw new NoSuchElementException();
263         }
264
265         return next;
266       }
267     };
268
269     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
270         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
271   }
272
273   @Override
274   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
275     if (isShutdown()) {
276       throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
277     }
278
279     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
280   }
281
282   @Override
283   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
284     if (isShutdown()) {
285       throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
286     }
287
288     final JanusGraph graph = getGraph();
289     final JanusGraphManagement createIndexMgmt = graph.openManagement();
290     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
291
292     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
293       return; //Ignore, index already exists
294     }
295     
296     LOGGER.info("Create edge index " + index.getName());
297     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
298
299     createIndexMgmt.commit();
300     graph.tx().commit();
301
302     awaitIndexCreation(index.getName());
303   }
304
305   @Override
306   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
307     if (isShutdown()) {
308       throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
309     }
310
311     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
312     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
313
314     if (index == null) {
315       return Optional.empty();
316     }
317     if (index.getIndexedElement() != JanusGraphEdge.class) {
318       return Optional.empty();
319     }
320
321     return Optional.of(ChampRelationshipIndex.create()
322         .ofName(indexName)
323         .onType(ChampObject.ReservedTypes.ANY.toString())
324         .forField(index.getFieldKeys()[0].name())
325         .build());
326   }
327
328   @Override
329   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
330     if (isShutdown()) {
331       throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
332     }
333
334     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
335     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
336
337     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
338
339       private ChampRelationshipIndex next;
340
341       @Override
342       public boolean hasNext() {
343         if (indices.hasNext()) {
344           final JanusGraphIndex index = indices.next();
345
346           next = ChampRelationshipIndex.create()
347               .ofName(index.name())
348               .onType(ChampRelationship.ReservedTypes.ANY.toString())
349               .forField(index.getFieldKeys()[0].name())
350               .build();
351           return true;
352         }
353
354         next = null;
355         return false;
356       }
357
358       @Override
359       public ChampRelationshipIndex next() {
360         if (next == null) {
361           throw new NoSuchElementException();
362         }
363
364         return next;
365       }
366     };
367
368     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
369         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
370   }
371
372   @Override
373   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
374     if (isShutdown()) {
375       throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
376     }
377
378     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
379   }
380
381   private Cardinality getJanusCardinality(ChampCardinality cardinality) {
382     switch (cardinality) {
383       case LIST:
384         return Cardinality.LIST;
385       case SET:
386         return Cardinality.SET;
387       case SINGLE:
388         return Cardinality.SINGLE;
389       default:
390         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
391     }
392   }
393
394   private void awaitIndexCreation(String indexName) {
395     //Wait for the index to become available
396     try {
397       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
398           .status(SchemaStatus.ENABLED)
399           .timeout(1, ChronoUnit.SECONDS)
400           .call()
401           .getSucceeded()) {
402         return; //Empty graphs immediately ENABLE indices
403       }
404
405       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
406           .status(SchemaStatus.REGISTERED)
407           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
408           .call()
409           .getSucceeded()) {
410         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
411         return;
412       }
413     } catch (InterruptedException e) {
414       LOGGER.warn("Interrupted while waiting for object index creation status");
415       Thread.currentThread().interrupt();
416       return;
417     }
418
419     //Reindex the existing data
420
421     try {
422       final JanusGraphManagement updateIndexMgmt = graph.openManagement();
423       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
424       updateIndexMgmt.commit();
425     } catch (InterruptedException e) {
426       LOGGER.warn("Interrupted while reindexing for object index");
427       Thread.currentThread().interrupt();
428       return;
429     } catch (ExecutionException e) {
430       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
431     }
432
433     try {
434       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
435           .status(SchemaStatus.ENABLED)
436           .timeout(2, ChronoUnit.MINUTES)
437           .call();
438     } catch (InterruptedException e) {
439       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
440       Thread.currentThread().interrupt();
441       return;
442     }
443   }
444
445   @Override
446   public ChampCapabilities capabilities() {
447     return CAPABILITIES;
448   }
449
450   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
451     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
452
453     final ChampSchema currentSchema = retrieveSchema();
454     final JanusGraphManagement mgmt = getGraph().openManagement();
455
456     try {
457       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
458         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
459           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
460
461           if (currentObjConstraint.isPresent()) {
462             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
463
464             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
465               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
466             }
467           }
468
469           final String newPropertyKeyName = propConstraint.getField().getName();
470
471           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
472
473           mgmt.makePropertyKey(newPropertyKeyName)
474               .dataType(propConstraint.getField().getJavaType())
475               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
476               .make();
477         }
478       }
479
480       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
481
482         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
483
484         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
485
486           if (currentRelConstraint.isPresent()) {
487             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
488
489             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
490               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
491             }
492           }
493
494           final String newPropertyKeyName = propConstraint.getField().getName();
495
496           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
497
498           mgmt.makePropertyKey(newPropertyKeyName)
499               .dataType(propConstraint.getField().getJavaType())
500               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
501               .make();
502         }
503
504         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
505
506         if (edgeLabel != null) {
507           mgmt.makeEdgeLabel(relConstraint.getType())
508               .directed()
509               .make();
510         }
511       }
512
513       mgmt.commit();
514
515       super.storeSchema(schema);
516     } catch (SchemaViolationException | ChampSchemaViolationException e) {
517       mgmt.rollback();
518       throw new ChampSchemaViolationException(e);
519     }
520   }
521   
522   private synchronized void openGraph() {
523     if (graph == null) {
524       graph = janusGraphBuilder.open();
525     }
526   }
527   
528   public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
529     return query.hasLabel(type);
530   }
531
532
533   @Override
534   public void createDefaultIndexes() {
535     if (isShutdown()) {
536       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
537     }
538
539     final String EDGE_IX_NAME = "rel-key-uuid";
540     
541     final JanusGraph graph = getGraph();
542     JanusGraphManagement createIndexMgmt = graph.openManagement();
543     
544     boolean vertexIndexExists = (createIndexMgmt.getGraphIndex(KEY_PROPERTY_NAME) != null);
545     boolean edgeIndexExists = (createIndexMgmt.getGraphIndex(EDGE_IX_NAME) != null);
546     boolean nodeTypeIndexExists = (createIndexMgmt.getGraphIndex(NODE_TYPE_PROPERTY_NAME) != null);
547     
548     if (!vertexIndexExists || !edgeIndexExists) {
549       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(KEY_PROPERTY_NAME);
550       
551       if (!vertexIndexExists) {
552         LOGGER.info("Create Index " + KEY_PROPERTY_NAME);
553         createIndexMgmt.buildIndex(KEY_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
554       }
555       if (!edgeIndexExists) {
556         LOGGER.info("Create Index " + EDGE_IX_NAME);
557         createIndexMgmt.buildIndex(EDGE_IX_NAME, Edge.class).addKey(pk).buildCompositeIndex();
558       }
559       createIndexMgmt.commit();
560
561       if (!vertexIndexExists) {
562         awaitIndexCreation(KEY_PROPERTY_NAME);
563       }
564       if (!edgeIndexExists) {
565         awaitIndexCreation(EDGE_IX_NAME);
566       }
567     }
568     else {
569       createIndexMgmt.rollback();
570       LOGGER.info("Index " + KEY_PROPERTY_NAME + " and " + EDGE_IX_NAME + " already exist");
571     }
572     
573     
574     
575     if (!nodeTypeIndexExists) {
576       LOGGER.info("Create Index " + NODE_TYPE_PROPERTY_NAME);
577       createIndexMgmt = graph.openManagement();
578       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(NODE_TYPE_PROPERTY_NAME);
579       createIndexMgmt.buildIndex(NODE_TYPE_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
580       createIndexMgmt.commit();
581       awaitIndexCreation(NODE_TYPE_PROPERTY_NAME);
582     }    
583   }
584 }