Merge "Added sonar fixes"
[aai/champ.git] / champ-lib / champ-janus / src / main / java / org / onap / aai / champjanus / graph / impl / JanusChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champjanus.graph.impl;
22
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.janusgraph.core.schema.JanusGraphIndex;
28 import org.janusgraph.core.schema.JanusGraphManagement;
29 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
35 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
36 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
37 import org.onap.aai.champcore.model.*;
38 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
39 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.security.SecureRandom;
44 import java.time.temporal.ChronoUnit;
45 import java.util.*;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
49
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51   private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52   private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53   private static final String JANUS_CQL_KEYSPACE = "storage.cql.keyspace";
54   private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
55   private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
56   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
57   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 45;
58
59   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
60
61     @Override
62     public boolean canDeleteObjectIndices() {
63       return false;
64     }
65
66     @Override
67     public boolean canDeleteRelationshipIndices() {
68       return false;
69     }
70   };
71
72   private JanusGraph graph;
73   private final JanusGraphFactory.Builder janusGraphBuilder;
74
75   public JanusChampGraphImpl(Builder builder) {
76     super(builder.graphConfiguration);
77     janusGraphBuilder = JanusGraphFactory.build();
78
79     for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
80       janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
81     }
82     
83     janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
84
85     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
86
87     if ("cassandra".equals(storageBackend) ||
88         "cassandrathrift".equals(storageBackend) ||
89         "astyanax".equals(storageBackend) ||
90         "embeddedcassandra".equals(storageBackend)) {
91
92       janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
93     } else if("cql".equals(storageBackend)){
94       janusGraphBuilder.set(JANUS_CQL_KEYSPACE, builder.graphName);
95     } else if ("hbase".equals(storageBackend)) {
96       janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
97     } else if ("berkleyje".equals(storageBackend)) {
98       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
99     } else if ("inmemory".equals(storageBackend)) {
100     } else {
101       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
102     }
103     
104     try {
105       openGraph();
106     }
107     catch (Exception ex) {
108       // Swallow exception.  Cassandra may not be reachable.  Will retry next time we need to use the graph.
109       LOGGER.error("Error opening graph: " + ex.getMessage());
110       return;
111     }
112     
113     LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
114   }
115
116   public static class Builder {
117     private final String graphName;
118
119     private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
120
121     public Builder(String graphName) {
122       this.graphName = graphName;
123     }
124     
125     public Builder(String graphName, Map<String, Object> properties) {
126         this.graphName = graphName;
127         properties(properties);
128     }
129
130     public Builder properties(Map<String, Object> properties) {
131       if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
132         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
133             + " in initial configuration - this path is used"
134             + " to specify graph names");
135       }
136
137       this.graphConfiguration.putAll(properties);
138       return this;
139     }
140
141     public Builder property(String path, Object value) {
142       if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
143         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
144             + " in initial configuration - this path is used"
145             + " to specify graph names");
146       }
147       graphConfiguration.put(path, value);
148       return this;
149     }
150
151     public JanusChampGraphImpl build() {
152       return new JanusChampGraphImpl(this);
153     }
154   }
155
156   @Override
157   protected JanusGraph getGraph() {
158     if (graph == null) {
159       openGraph();
160     }
161     return graph;
162   }
163
164  
165   @Override
166   protected ChampSchemaEnforcer getSchemaEnforcer() {
167     return SCHEMA_ENFORCER;
168   }
169
170   @Override
171   public void executeStoreObjectIndex(ChampObjectIndex index) {
172     if (isShutdown()) {
173       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
174     }
175
176     final JanusGraph graph = getGraph();
177     final JanusGraphManagement createIndexMgmt = graph.openManagement();
178
179     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
180       createIndexMgmt.rollback();
181       LOGGER.info("Index " + index.getName() + " already exists");
182       return; //Ignore, index already exists
183     }
184
185     LOGGER.info("Create index " + index.getName());
186     IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
187     for (ChampField field : index.getFields()) {
188       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
189       ib = ib.addKey(pk);
190     }
191     ib.buildCompositeIndex();
192
193     createIndexMgmt.commit();
194     graph.tx().commit();
195
196     awaitIndexCreation(index.getName());
197   }
198
199   @Override
200   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
201     if (isShutdown()) {
202       throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
203     }
204
205     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
206     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
207
208     if (index == null) {
209       return Optional.empty();
210     }
211     if (index.getIndexedElement() != JanusGraphVertex.class) {
212       return Optional.empty();
213     }
214
215     List<String> fieldNames = new ArrayList<String>();
216     for (int i = 0; i < index.getFieldKeys().length; i++) {
217       fieldNames.add(index.getFieldKeys()[i].name());
218     }
219     
220     return Optional.of(ChampObjectIndex.create()
221         .ofName(indexName)
222         .onType(ChampObject.ReservedTypes.ANY.toString())
223         .forFields(fieldNames)
224         .build());
225   }
226
227   @Override
228   public Stream<ChampObjectIndex> retrieveObjectIndices() {
229     if (isShutdown()) {
230       throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
231     }
232
233     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
234     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
235
236     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
237
238       private ChampObjectIndex next;
239
240       @Override
241       public boolean hasNext() {
242         if (indices.hasNext()) {
243           final JanusGraphIndex index = indices.next();
244
245           List<String> fieldNames = new ArrayList<String>();
246           for (int i = 0; i < index.getFieldKeys().length; i++) {
247             fieldNames.add(index.getFieldKeys()[i].name());
248           }
249           
250           next = ChampObjectIndex.create()
251               .ofName(index.name())
252               .onType(ChampObject.ReservedTypes.ANY.toString())
253               .forFields(fieldNames)
254               .build();
255           return true;
256         }
257
258         next = null;
259         return false;
260       }
261
262       @Override
263       public ChampObjectIndex next() {
264         if (next == null) {
265           throw new NoSuchElementException();
266         }
267
268         return next;
269       }
270     };
271
272     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
273         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
274   }
275
276   @Override
277   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
278     if (isShutdown()) {
279       throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
280     }
281
282     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
283   }
284
285   @Override
286   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
287     if (isShutdown()) {
288       throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
289     }
290
291     final JanusGraph graph = getGraph();
292     final JanusGraphManagement createIndexMgmt = graph.openManagement();
293     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
294
295     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
296       return; //Ignore, index already exists
297     }
298     
299     LOGGER.info("Create edge index " + index.getName());
300     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
301
302     createIndexMgmt.commit();
303     graph.tx().commit();
304
305     awaitIndexCreation(index.getName());
306   }
307
308   @Override
309   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
310     if (isShutdown()) {
311       throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
312     }
313
314     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
315     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
316
317     if (index == null) {
318       return Optional.empty();
319     }
320     if (index.getIndexedElement() != JanusGraphEdge.class) {
321       return Optional.empty();
322     }
323
324     return Optional.of(ChampRelationshipIndex.create()
325         .ofName(indexName)
326         .onType(ChampObject.ReservedTypes.ANY.toString())
327         .forField(index.getFieldKeys()[0].name())
328         .build());
329   }
330
331   @Override
332   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
333     if (isShutdown()) {
334       throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
335     }
336
337     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
338     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
339
340     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
341
342       private ChampRelationshipIndex next;
343
344       @Override
345       public boolean hasNext() {
346         if (indices.hasNext()) {
347           final JanusGraphIndex index = indices.next();
348
349           next = ChampRelationshipIndex.create()
350               .ofName(index.name())
351               .onType(ChampRelationship.ReservedTypes.ANY.toString())
352               .forField(index.getFieldKeys()[0].name())
353               .build();
354           return true;
355         }
356
357         next = null;
358         return false;
359       }
360
361       @Override
362       public ChampRelationshipIndex next() {
363         if (next == null) {
364           throw new NoSuchElementException();
365         }
366
367         return next;
368       }
369     };
370
371     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
372         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
373   }
374
375   @Override
376   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
377     if (isShutdown()) {
378       throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
379     }
380
381     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
382   }
383
384   private Cardinality getJanusCardinality(ChampCardinality cardinality) {
385     switch (cardinality) {
386       case LIST:
387         return Cardinality.LIST;
388       case SET:
389         return Cardinality.SET;
390       case SINGLE:
391         return Cardinality.SINGLE;
392       default:
393         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
394     }
395   }
396
397   private void awaitIndexCreation(String indexName) {
398     //Wait for the index to become available
399     try {
400       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
401           .status(SchemaStatus.ENABLED)
402           .timeout(1, ChronoUnit.SECONDS)
403           .call()
404           .getSucceeded()) {
405         return; //Empty graphs immediately ENABLE indices
406       }
407
408       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
409           .status(SchemaStatus.REGISTERED)
410           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
411           .call()
412           .getSucceeded()) {
413         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
414         return;
415       }
416     } catch (InterruptedException e) {
417       LOGGER.warn("Interrupted while waiting for object index creation status");
418       Thread.currentThread().interrupt();
419       return;
420     }
421
422     //Reindex the existing data
423
424     try {
425       final JanusGraphManagement updateIndexMgmt = graph.openManagement();
426       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
427       updateIndexMgmt.commit();
428     } catch (InterruptedException e) {
429       LOGGER.warn("Interrupted while reindexing for object index");
430       Thread.currentThread().interrupt();
431       return;
432     } catch (ExecutionException e) {
433       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
434     }
435
436     try {
437       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
438           .status(SchemaStatus.ENABLED)
439           .timeout(2, ChronoUnit.MINUTES)
440           .call();
441     } catch (InterruptedException e) {
442       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
443       Thread.currentThread().interrupt();
444       return;
445     }
446   }
447
448   @Override
449   public ChampCapabilities capabilities() {
450     return CAPABILITIES;
451   }
452
453   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
454     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
455
456     final ChampSchema currentSchema = retrieveSchema();
457     final JanusGraphManagement mgmt = getGraph().openManagement();
458
459     try {
460       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
461         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
462           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
463
464           if (currentObjConstraint.isPresent()) {
465             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
466
467             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
468               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
469             }
470           }
471
472           final String newPropertyKeyName = propConstraint.getField().getName();
473
474           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
475
476           mgmt.makePropertyKey(newPropertyKeyName)
477               .dataType(propConstraint.getField().getJavaType())
478               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
479               .make();
480         }
481       }
482
483       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
484
485         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
486
487         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
488
489           if (currentRelConstraint.isPresent()) {
490             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
491
492             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
493               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
494             }
495           }
496
497           final String newPropertyKeyName = propConstraint.getField().getName();
498
499           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
500
501           mgmt.makePropertyKey(newPropertyKeyName)
502               .dataType(propConstraint.getField().getJavaType())
503               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
504               .make();
505         }
506
507         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
508
509         if (edgeLabel != null) {
510           mgmt.makeEdgeLabel(relConstraint.getType())
511               .directed()
512               .make();
513         }
514       }
515
516       mgmt.commit();
517
518       super.storeSchema(schema);
519     } catch (SchemaViolationException | ChampSchemaViolationException e) {
520       mgmt.rollback();
521       throw new ChampSchemaViolationException(e);
522     }
523   }
524   
525   private synchronized void openGraph() {
526     if (graph == null) {
527       graph = janusGraphBuilder.open();
528     }
529   }
530   
531   public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
532     return query.hasLabel(type);
533   }
534
535
536   @Override
537   public void createDefaultIndexes() {
538     if (isShutdown()) {
539       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
540     }
541
542     final String EDGE_IX_NAME = "rel-key-uuid";
543     
544     final JanusGraph graph = getGraph();
545     JanusGraphManagement createIndexMgmt = graph.openManagement();
546     
547     boolean vertexIndexExists = (createIndexMgmt.getGraphIndex(KEY_PROPERTY_NAME) != null);
548     boolean edgeIndexExists = (createIndexMgmt.getGraphIndex(EDGE_IX_NAME) != null);
549     boolean nodeTypeIndexExists = (createIndexMgmt.getGraphIndex(NODE_TYPE_PROPERTY_NAME) != null);
550     
551     if (!vertexIndexExists || !edgeIndexExists) {
552       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(KEY_PROPERTY_NAME);
553       
554       if (!vertexIndexExists) {
555         LOGGER.info("Create Index " + KEY_PROPERTY_NAME);
556         createIndexMgmt.buildIndex(KEY_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
557       }
558       if (!edgeIndexExists) {
559         LOGGER.info("Create Index " + EDGE_IX_NAME);
560         createIndexMgmt.buildIndex(EDGE_IX_NAME, Edge.class).addKey(pk).buildCompositeIndex();
561       }
562       createIndexMgmt.commit();
563
564       if (!vertexIndexExists) {
565         awaitIndexCreation(KEY_PROPERTY_NAME);
566       }
567       if (!edgeIndexExists) {
568         awaitIndexCreation(EDGE_IX_NAME);
569       }
570     }
571     else {
572       createIndexMgmt.rollback();
573       LOGGER.info("Index " + KEY_PROPERTY_NAME + " and " + EDGE_IX_NAME + " already exist");
574     }
575     
576     
577     
578     if (!nodeTypeIndexExists) {
579       LOGGER.info("Create Index " + NODE_TYPE_PROPERTY_NAME);
580       createIndexMgmt = graph.openManagement();
581       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(NODE_TYPE_PROPERTY_NAME);
582       createIndexMgmt.buildIndex(NODE_TYPE_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
583       createIndexMgmt.commit();
584       awaitIndexCreation(NODE_TYPE_PROPERTY_NAME);
585     }    
586   }
587 }