293b51c24d1bde48e577ed99e978f48afca3db75
[aai/champ.git] / champ-lib / champ-titan / src / main / java / org / onap / aai / champtitan / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champtitan.graph.impl;
22
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
25 import java.util.*;
26 import java.util.Map.Entry;
27 import java.util.concurrent.ExecutionException;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
30
31 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
32 import org.apache.tinkerpop.gremlin.structure.Edge;
33 import org.apache.tinkerpop.gremlin.structure.Vertex;
34 import org.onap.aai.champcore.ChampCapabilities;
35 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
36 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
37 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
38 import org.onap.aai.champcore.model.ChampCardinality;
39 import org.onap.aai.champcore.model.ChampField;
40 import org.onap.aai.champcore.model.ChampObject;
41 import org.onap.aai.champcore.model.ChampObjectConstraint;
42 import org.onap.aai.champcore.model.ChampObjectIndex;
43 import org.onap.aai.champcore.model.ChampPropertyConstraint;
44 import org.onap.aai.champcore.model.ChampRelationship;
45 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
46 import org.onap.aai.champcore.model.ChampRelationshipIndex;
47 import org.onap.aai.champcore.model.ChampSchema;
48 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
49 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.thinkaurelius.titan.core.Cardinality;
54 import com.thinkaurelius.titan.core.EdgeLabel;
55 import com.thinkaurelius.titan.core.PropertyKey;
56 import com.thinkaurelius.titan.core.SchemaViolationException;
57 import com.thinkaurelius.titan.core.TitanEdge;
58 import com.thinkaurelius.titan.core.TitanFactory;
59 import com.thinkaurelius.titan.core.TitanGraph;
60 import com.thinkaurelius.titan.core.TitanVertex;
61 import com.thinkaurelius.titan.core.schema.SchemaAction;
62 import com.thinkaurelius.titan.core.schema.SchemaStatus;
63 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
64 import com.thinkaurelius.titan.core.schema.TitanManagement;
65 import com.thinkaurelius.titan.core.schema.TitanManagement.IndexBuilder;
66 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
67
68 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
69
70   private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
71   private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
72   private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
73   private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
74   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
75   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
76
77   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
78
79     @Override
80     public boolean canDeleteObjectIndices() {
81       return false;
82     }
83
84     @Override
85     public boolean canDeleteRelationshipIndices() {
86       return false;
87     }
88   };
89
90   private final TitanGraph graph;
91
92   public TitanChampGraphImpl(Builder builder) {
93     super(builder.graphConfiguration);
94     final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
95
96     for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
97       titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
98     }
99
100     titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
101     
102     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
103
104     if ("cassandra".equals(storageBackend) ||
105         "cassandrathrift".equals(storageBackend) ||
106         "astyanax".equals(storageBackend) ||
107         "embeddedcassandra".equals(storageBackend)) {
108       titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
109     } else if ("hbase".equals(storageBackend)) {
110       titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
111     } else if ("berkleyje".equals(storageBackend)) {
112       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
113     } else if ("inmemory".equals(storageBackend)) {
114     } else {
115       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
116     }
117     
118     LOGGER.info("Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
119
120     this.graph = titanGraphBuilder.open();
121   }
122
123   public static class Builder {
124     private final String graphName;
125
126     private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
127
128     public Builder(String graphName) {
129       this.graphName = graphName;
130     }
131     
132     public Builder(String graphName, Map<String, Object> properties) {
133         this.graphName = graphName;
134         properties(properties);
135     }
136
137     public Builder properties(Map<String, Object> properties) {
138       if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
139         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
140             + " in initial configuration - this path is used"
141             + " to specify graph names");
142
143       this.graphConfiguration.putAll(properties);
144       return this;
145     }
146
147     public Builder property(String path, Object value) {
148       if (path.equals(TITAN_CASSANDRA_KEYSPACE))
149         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
150             + " in initial configuration - this path is used"
151             + " to specify graph names");
152       graphConfiguration.put(path, value);
153       return this;
154     }
155
156     public TitanChampGraphImpl build() {
157       return new TitanChampGraphImpl(this);
158     }
159   }
160
161   @Override
162   protected TitanGraph getGraph() {
163     return graph;
164   }
165
166   @Override
167   protected ChampSchemaEnforcer getSchemaEnforcer() {
168     return SCHEMA_ENFORCER;
169   }
170
171   public void executeStoreObjectIndex(ChampObjectIndex index) {
172     if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
173
174     final TitanGraph graph = getGraph();
175     final TitanManagement createIndexMgmt = graph.openManagement();
176
177     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
178       createIndexMgmt.rollback();
179       return; //Ignore, index already exists
180     }
181
182     IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
183     for (ChampField field : index.getFields()) {
184       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
185       ib = ib.addKey(pk);
186     }
187     ib.buildCompositeIndex();
188     createIndexMgmt.commit();
189     graph.tx().commit();
190
191     awaitIndexCreation(index.getName());
192   }
193
194   @Override
195   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
196     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
197
198     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
199     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
200
201     if (index == null) return Optional.empty();
202     if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
203     List<String> fieldNames = new ArrayList<String>();
204     for (int i = 0; i < index.getFieldKeys().length; i++) {
205       fieldNames.add(index.getFieldKeys()[i].name());
206     }
207
208     return Optional.of(ChampObjectIndex.create()
209         .ofName(indexName)
210         .onType(ChampObject.ReservedTypes.ANY.toString())
211         .forFields(fieldNames)
212         .build());
213   }
214
215   @Override
216   public Stream<ChampObjectIndex> retrieveObjectIndices() {
217     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
218
219     final TitanManagement createIndexMgmt = getGraph().openManagement();
220     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
221
222     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
223
224       private ChampObjectIndex next;
225
226       @Override
227       public boolean hasNext() {
228         if (indices.hasNext()) {
229           final TitanGraphIndex index = indices.next();
230           
231           List<String> fieldNames = new ArrayList<String>();
232           for (int i = 0; i < index.getFieldKeys().length; i++) {
233             fieldNames.add(index.getFieldKeys()[i].name());
234           }
235
236           next = ChampObjectIndex.create()
237               .ofName(index.name())
238               .onType(ChampObject.ReservedTypes.ANY.toString())
239               .forFields(fieldNames)
240               .build();
241           return true;
242         }
243
244         next = null;
245         return false;
246       }
247
248       @Override
249       public ChampObjectIndex next() {
250         if (next == null) throw new NoSuchElementException();
251
252         return next;
253       }
254     };
255
256     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
257         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
258   }
259
260   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
261     if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
262
263     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
264   }
265
266   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
267     if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
268
269     final TitanGraph graph = getGraph();
270     final TitanManagement createIndexMgmt = graph.openManagement();
271     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
272
273     if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
274     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
275
276     createIndexMgmt.commit();
277     graph.tx().commit();
278
279     awaitIndexCreation(index.getName());
280   }
281
282   @Override
283   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
284     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
285
286     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
287     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
288
289     if (index == null) return Optional.empty();
290     if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
291
292     return Optional.of(ChampRelationshipIndex.create()
293         .ofName(indexName)
294         .onType(ChampObject.ReservedTypes.ANY.toString())
295         .forField(index.getFieldKeys()[0].name())
296         .build());
297   }
298
299   @Override
300   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
301     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
302
303     final TitanManagement createIndexMgmt = getGraph().openManagement();
304     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
305
306     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
307
308       private ChampRelationshipIndex next;
309
310       @Override
311       public boolean hasNext() {
312         if (indices.hasNext()) {
313           final TitanGraphIndex index = indices.next();
314
315           next = ChampRelationshipIndex.create()
316               .ofName(index.name())
317               .onType(ChampRelationship.ReservedTypes.ANY.toString())
318               .forField(index.getFieldKeys()[0].name())
319               .build();
320           return true;
321         }
322
323         next = null;
324         return false;
325       }
326
327       @Override
328       public ChampRelationshipIndex next() {
329         if (next == null) throw new NoSuchElementException();
330
331         return next;
332       }
333     };
334
335     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
336         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
337   }
338
339   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
340     if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
341
342     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
343   }
344
345   private Cardinality getTitanCardinality(ChampCardinality cardinality) {
346     switch (cardinality) {
347       case LIST:
348         return Cardinality.LIST;
349       case SET:
350         return Cardinality.SET;
351       case SINGLE:
352         return Cardinality.SINGLE;
353       default:
354         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
355     }
356   }
357
358   private void awaitIndexCreation(String indexName) {
359     //Wait for the index to become available
360     try {
361       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
362           .status(SchemaStatus.ENABLED)
363           .timeout(1, ChronoUnit.SECONDS)
364           .call()
365           .getSucceeded()) {
366         return; //Empty graphs immediately ENABLE indices
367       }
368
369       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
370           .status(SchemaStatus.REGISTERED)
371           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
372           .call()
373           .getSucceeded()) {
374         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
375         return;
376       }
377     } catch (InterruptedException e) {
378       LOGGER.warn("Interrupted while waiting for object index creation status");
379       Thread.currentThread().interrupt();
380       return;
381     }
382
383     //Reindex the existing data
384
385     try {
386       final TitanManagement updateIndexMgmt = graph.openManagement();
387       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
388       updateIndexMgmt.commit();
389     } catch (InterruptedException e) {
390       LOGGER.warn("Interrupted while reindexing for object index");
391       Thread.currentThread().interrupt();
392       return;
393     } catch (ExecutionException e) {
394       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
395     }
396
397     try {
398       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
399           .status(SchemaStatus.ENABLED)
400           .timeout(10, ChronoUnit.MINUTES)
401           .call();
402     } catch (InterruptedException e) {
403       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
404       Thread.currentThread().interrupt();
405       return;
406     }
407   }
408
409   @Override
410   public ChampCapabilities capabilities() {
411     return CAPABILITIES;
412   }
413
414   @Override
415   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
416     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
417
418     final ChampSchema currentSchema = retrieveSchema();
419     final TitanManagement mgmt = getGraph().openManagement();
420
421     try {
422       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
423         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
424           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
425
426           if (currentObjConstraint.isPresent()) {
427             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
428
429             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
430               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
431             }
432           }
433
434           final String newPropertyKeyName = propConstraint.getField().getName();
435
436           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
437
438           mgmt.makePropertyKey(newPropertyKeyName)
439               .dataType(propConstraint.getField().getJavaType())
440               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
441               .make();
442         }
443       }
444
445       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
446
447         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
448
449         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
450
451           if (currentRelConstraint.isPresent()) {
452             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
453
454             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
455               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
456             }
457           }
458
459           final String newPropertyKeyName = propConstraint.getField().getName();
460
461           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
462
463           mgmt.makePropertyKey(newPropertyKeyName)
464               .dataType(propConstraint.getField().getJavaType())
465               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
466               .make();
467         }
468
469         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
470
471         if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
472             .directed()
473             .make();
474       }
475
476       mgmt.commit();
477
478       super.storeSchema(schema);
479     } catch (SchemaViolationException | ChampSchemaViolationException e) {
480       mgmt.rollback();
481       throw new ChampSchemaViolationException(e);
482     }
483   }
484
485         @Override
486         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
487                 return query.hasLabel((String) type);
488         }
489
490   @Override
491   public void createDefaultIndexes() {
492     LOGGER.error("No default indexes being created");
493   }
494 }