9ede7dfac306084fd5cefd180a38488e5f54ef3d
[aai/champ.git] / champ-lib / champ-janus / src / main / java / org / onap / aai / champjanus / graph / impl / JanusChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champjanus.graph.impl;
23
24 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
25 import org.apache.tinkerpop.gremlin.structure.Edge;
26 import org.apache.tinkerpop.gremlin.structure.Vertex;
27 import org.janusgraph.core.*;
28 import org.onap.aai.champcore.Formatter;
29 import org.janusgraph.core.schema.JanusGraphIndex;
30 import org.janusgraph.core.schema.JanusGraphManagement;
31 import org.janusgraph.core.schema.SchemaAction;
32 import org.janusgraph.core.schema.SchemaStatus;
33 import org.janusgraph.graphdb.database.management.ManagementSystem;
34 import org.onap.aai.champcore.ChampCapabilities;
35 import org.onap.aai.champcore.FormatMapper;
36 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
37 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
38 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
39 import org.onap.aai.champcore.model.*;
40 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
41 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.time.temporal.ChronoUnit;
46 import java.util.*;
47 import java.util.concurrent.ExecutionException;
48 import java.util.stream.Stream;
49 import java.util.stream.StreamSupport;
50
51 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
52   private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
53   private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
54   private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
55   private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
56   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
57   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
58
59   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
60
61     @Override
62     public boolean canDeleteObjectIndices() {
63       return false;
64     }
65
66     @Override
67     public boolean canDeleteRelationshipIndices() {
68       return false;
69     }
70   };
71
72   private final JanusGraph graph;
73
74   public JanusChampGraphImpl(Builder builder) {
75     super(builder.graphConfiguration);
76     final JanusGraphFactory.Builder janusGraphBuilder = JanusGraphFactory.build();
77
78     for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
79       janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
80     }
81     
82     janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new Random().nextInt(Short.MAX_VALUE)+""));
83
84     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
85
86     if ("cassandra".equals(storageBackend) ||
87         "cassandrathrift".equals(storageBackend) ||
88         "astyanax".equals(storageBackend) ||
89         "embeddedcassandra".equals(storageBackend)) {
90         
91         janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
92     } else if ("hbase".equals(storageBackend)) {
93       janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
94     } else if ("berkleyje".equals(storageBackend)) {
95       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
96     } else if ("inmemory".equals(storageBackend)) {
97     } else {
98       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
99     }
100     
101     LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
102     this.graph = janusGraphBuilder.open();
103   }
104
105   public static class Builder {
106     private final String graphName;
107
108     private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
109
110     public Builder(String graphName) {
111       this.graphName = graphName;
112     }
113     
114     public Builder(String graphName, Map<String, Object> properties) {
115         this.graphName = graphName;
116         properties(properties);
117     }
118
119     public Builder properties(Map<String, Object> properties) {
120       if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
121         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
122             + " in initial configuration - this path is used"
123             + " to specify graph names");
124       }
125
126       this.graphConfiguration.putAll(properties);
127       return this;
128     }
129
130     public Builder property(String path, Object value) {
131       if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
132         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
133             + " in initial configuration - this path is used"
134             + " to specify graph names");
135       }
136       graphConfiguration.put(path, value);
137       return this;
138     }
139
140     public JanusChampGraphImpl build() {
141       return new JanusChampGraphImpl(this);
142     }
143   }
144
145   @Override
146   protected JanusGraph getGraph() {
147     return graph;
148   }
149
150  
151   @Override
152   protected ChampSchemaEnforcer getSchemaEnforcer() {
153     return SCHEMA_ENFORCER;
154   }
155
156   @Override
157   public void executeStoreObjectIndex(ChampObjectIndex index) {
158     if (isShutdown()) {
159       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
160     }
161
162     final JanusGraph graph = getGraph();
163     final JanusGraphManagement createIndexMgmt = graph.openManagement();
164     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
165
166     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
167       createIndexMgmt.rollback();
168       return; //Ignore, index already exists
169     }
170
171     createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
172
173     createIndexMgmt.commit();
174     graph.tx().commit();
175
176     awaitIndexCreation(index.getName());
177   }
178
179   @Override
180   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
181     if (isShutdown()) {
182       throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
183     }
184
185     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
186     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
187
188     if (index == null) {
189       return Optional.empty();
190     }
191     if (index.getIndexedElement() != JanusGraphVertex.class) {
192       return Optional.empty();
193     }
194
195     return Optional.of(ChampObjectIndex.create()
196         .ofName(indexName)
197         .onType(ChampObject.ReservedTypes.ANY.toString())
198         .forField(index.getFieldKeys()[0].name())
199         .build());
200   }
201
202   @Override
203   public Stream<ChampObjectIndex> retrieveObjectIndices() {
204     if (isShutdown()) {
205       throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
206     }
207
208     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
209     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
210
211     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
212
213       private ChampObjectIndex next;
214
215       @Override
216       public boolean hasNext() {
217         if (indices.hasNext()) {
218           final JanusGraphIndex index = indices.next();
219
220           next = ChampObjectIndex.create()
221               .ofName(index.name())
222               .onType(ChampObject.ReservedTypes.ANY.toString())
223               .forField(index.getFieldKeys()[0].name())
224               .build();
225           return true;
226         }
227
228         next = null;
229         return false;
230       }
231
232       @Override
233       public ChampObjectIndex next() {
234         if (next == null) {
235           throw new NoSuchElementException();
236         }
237
238         return next;
239       }
240     };
241
242     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
243         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
244   }
245
246   @Override
247   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
248     if (isShutdown()) {
249       throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
250     }
251
252     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
253   }
254
255   @Override
256   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
257     if (isShutdown()) {
258       throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
259     }
260
261     final JanusGraph graph = getGraph();
262     final JanusGraphManagement createIndexMgmt = graph.openManagement();
263     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
264
265     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
266       return; //Ignore, index already exists
267     }
268     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
269
270     createIndexMgmt.commit();
271     graph.tx().commit();
272
273     awaitIndexCreation(index.getName());
274   }
275
276   @Override
277   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
278     if (isShutdown()) {
279       throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
280     }
281
282     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
283     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
284
285     if (index == null) {
286       return Optional.empty();
287     }
288     if (index.getIndexedElement() != JanusGraphEdge.class) {
289       return Optional.empty();
290     }
291
292     return Optional.of(ChampRelationshipIndex.create()
293         .ofName(indexName)
294         .onType(ChampObject.ReservedTypes.ANY.toString())
295         .forField(index.getFieldKeys()[0].name())
296         .build());
297   }
298
299   @Override
300   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
301     if (isShutdown()) {
302       throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
303     }
304
305     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
306     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
307
308     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
309
310       private ChampRelationshipIndex next;
311
312       @Override
313       public boolean hasNext() {
314         if (indices.hasNext()) {
315           final JanusGraphIndex index = indices.next();
316
317           next = ChampRelationshipIndex.create()
318               .ofName(index.name())
319               .onType(ChampRelationship.ReservedTypes.ANY.toString())
320               .forField(index.getFieldKeys()[0].name())
321               .build();
322           return true;
323         }
324
325         next = null;
326         return false;
327       }
328
329       @Override
330       public ChampRelationshipIndex next() {
331         if (next == null) {
332           throw new NoSuchElementException();
333         }
334
335         return next;
336       }
337     };
338
339     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
340         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
341   }
342
343   @Override
344   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
345     if (isShutdown()) {
346       throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
347     }
348
349     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
350   }
351
352   private Cardinality getJanusCardinality(ChampCardinality cardinality) {
353     switch (cardinality) {
354       case LIST:
355         return Cardinality.LIST;
356       case SET:
357         return Cardinality.SET;
358       case SINGLE:
359         return Cardinality.SINGLE;
360       default:
361         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
362     }
363   }
364
365   private void awaitIndexCreation(String indexName) {
366     //Wait for the index to become available
367     try {
368       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
369           .status(SchemaStatus.ENABLED)
370           .timeout(1, ChronoUnit.SECONDS)
371           .call()
372           .getSucceeded()) {
373         return; //Empty graphs immediately ENABLE indices
374       }
375
376       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
377           .status(SchemaStatus.REGISTERED)
378           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
379           .call()
380           .getSucceeded()) {
381         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
382         return;
383       }
384     } catch (InterruptedException e) {
385       LOGGER.warn("Interrupted while waiting for object index creation status");
386       return;
387     }
388
389     //Reindex the existing data
390
391     try {
392       final JanusGraphManagement updateIndexMgmt = graph.openManagement();
393       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
394       updateIndexMgmt.commit();
395     } catch (InterruptedException e) {
396       LOGGER.warn("Interrupted while reindexing for object index");
397       return;
398     } catch (ExecutionException e) {
399       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
400     }
401
402     try {
403       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
404           .status(SchemaStatus.ENABLED)
405           .timeout(10, ChronoUnit.MINUTES)
406           .call();
407     } catch (InterruptedException e) {
408       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
409       return;
410     }
411   }
412
413   @Override
414   public ChampCapabilities capabilities() {
415     return CAPABILITIES;
416   }
417
418   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
419     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
420
421     final ChampSchema currentSchema = retrieveSchema();
422     final JanusGraphManagement mgmt = getGraph().openManagement();
423
424     try {
425       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
426         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
427           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
428
429           if (currentObjConstraint.isPresent()) {
430             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
431
432             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
433               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
434             }
435           }
436
437           final String newPropertyKeyName = propConstraint.getField().getName();
438
439           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
440
441           mgmt.makePropertyKey(newPropertyKeyName)
442               .dataType(propConstraint.getField().getJavaType())
443               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
444               .make();
445         }
446       }
447
448       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
449
450         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
451
452         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
453
454           if (currentRelConstraint.isPresent()) {
455             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
456
457             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
458               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
459             }
460           }
461
462           final String newPropertyKeyName = propConstraint.getField().getName();
463
464           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
465
466           mgmt.makePropertyKey(newPropertyKeyName)
467               .dataType(propConstraint.getField().getJavaType())
468               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
469               .make();
470         }
471
472         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
473
474         if (edgeLabel != null) {
475           mgmt.makeEdgeLabel(relConstraint.getType())
476               .directed()
477               .make();
478         }
479       }
480
481       mgmt.commit();
482
483       super.storeSchema(schema);
484     } catch (SchemaViolationException | ChampSchemaViolationException e) {
485       mgmt.rollback();
486       throw new ChampSchemaViolationException(e);
487     }
488   }
489   
490         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
491                 return query.hasLabel(type);
492         }
493 }