Update license date and text
[aai/champ.git] / champ-lib / champ-titan / src / main / java / org / onap / aai / champtitan / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champtitan.graph.impl;
22
23 import java.time.temporal.ChronoUnit;
24 import java.util.*;
25 import java.util.Map.Entry;
26 import java.util.concurrent.ExecutionException;
27 import java.util.stream.Stream;
28 import java.util.stream.StreamSupport;
29
30 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
31 import org.apache.tinkerpop.gremlin.structure.Edge;
32 import org.apache.tinkerpop.gremlin.structure.Vertex;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.FormatMapper;
35 import org.onap.aai.champcore.Formatter;
36 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
37 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
38 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
39 import org.onap.aai.champcore.model.ChampCardinality;
40 import org.onap.aai.champcore.model.ChampObject;
41 import org.onap.aai.champcore.model.ChampObjectConstraint;
42 import org.onap.aai.champcore.model.ChampObjectIndex;
43 import org.onap.aai.champcore.model.ChampPropertyConstraint;
44 import org.onap.aai.champcore.model.ChampRelationship;
45 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
46 import org.onap.aai.champcore.model.ChampRelationshipIndex;
47 import org.onap.aai.champcore.model.ChampSchema;
48 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
49 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.thinkaurelius.titan.core.Cardinality;
54 import com.thinkaurelius.titan.core.EdgeLabel;
55 import com.thinkaurelius.titan.core.PropertyKey;
56 import com.thinkaurelius.titan.core.SchemaViolationException;
57 import com.thinkaurelius.titan.core.TitanEdge;
58 import com.thinkaurelius.titan.core.TitanFactory;
59 import com.thinkaurelius.titan.core.TitanGraph;
60 import com.thinkaurelius.titan.core.TitanVertex;
61 import com.thinkaurelius.titan.core.schema.SchemaAction;
62 import com.thinkaurelius.titan.core.schema.SchemaStatus;
63 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
64 import com.thinkaurelius.titan.core.schema.TitanManagement;
65 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
66
67 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
68
69   private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
70   private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
71   private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
72   private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
73   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
74   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
75
76   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
77
78     @Override
79     public boolean canDeleteObjectIndices() {
80       return false;
81     }
82
83     @Override
84     public boolean canDeleteRelationshipIndices() {
85       return false;
86     }
87   };
88
89   private final TitanGraph graph;
90
91   public TitanChampGraphImpl(Builder builder) {
92     super(builder.graphConfiguration);
93     final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
94
95     for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
96       titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
97     }
98
99     titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new Random().nextInt(Short.MAX_VALUE)+""));
100     
101     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
102
103     if ("cassandra".equals(storageBackend) ||
104         "cassandrathrift".equals(storageBackend) ||
105         "astyanax".equals(storageBackend) ||
106         "embeddedcassandra".equals(storageBackend)) {
107       titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
108     } else if ("hbase".equals(storageBackend)) {
109       titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
110     } else if ("berkleyje".equals(storageBackend)) {
111       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
112     } else if ("inmemory".equals(storageBackend)) {
113     } else {
114       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
115     }
116     
117     LOGGER.info("Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
118
119     this.graph = titanGraphBuilder.open();
120   }
121
122   public static class Builder {
123     private final String graphName;
124
125     private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
126
127     public Builder(String graphName) {
128       this.graphName = graphName;
129     }
130     
131     public Builder(String graphName, Map<String, Object> properties) {
132         this.graphName = graphName;
133         properties(properties);
134     }
135
136     public Builder properties(Map<String, Object> properties) {
137       if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
138         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
139             + " in initial configuration - this path is used"
140             + " to specify graph names");
141
142       this.graphConfiguration.putAll(properties);
143       return this;
144     }
145
146     public Builder property(String path, Object value) {
147       if (path.equals(TITAN_CASSANDRA_KEYSPACE))
148         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
149             + " in initial configuration - this path is used"
150             + " to specify graph names");
151       graphConfiguration.put(path, value);
152       return this;
153     }
154
155     public TitanChampGraphImpl build() {
156       return new TitanChampGraphImpl(this);
157     }
158   }
159
160   @Override
161   protected TitanGraph getGraph() {
162     return graph;
163   }
164
165   @Override
166   protected ChampSchemaEnforcer getSchemaEnforcer() {
167     return SCHEMA_ENFORCER;
168   }
169
170   public void executeStoreObjectIndex(ChampObjectIndex index) {
171     if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
172
173     final TitanGraph graph = getGraph();
174     final TitanManagement createIndexMgmt = graph.openManagement();
175     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
176
177     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
178       createIndexMgmt.rollback();
179       return; //Ignore, index already exists
180     }
181
182     createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
183
184     createIndexMgmt.commit();
185     graph.tx().commit();
186
187     awaitIndexCreation(index.getName());
188   }
189
190   @Override
191   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
192     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
193
194     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
195     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
196
197     if (index == null) return Optional.empty();
198     if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
199
200     return Optional.of(ChampObjectIndex.create()
201         .ofName(indexName)
202         .onType(ChampObject.ReservedTypes.ANY.toString())
203         .forField(index.getFieldKeys()[0].name())
204         .build());
205   }
206
207   @Override
208   public Stream<ChampObjectIndex> retrieveObjectIndices() {
209     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
210
211     final TitanManagement createIndexMgmt = getGraph().openManagement();
212     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
213
214     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
215
216       private ChampObjectIndex next;
217
218       @Override
219       public boolean hasNext() {
220         if (indices.hasNext()) {
221           final TitanGraphIndex index = indices.next();
222
223           next = ChampObjectIndex.create()
224               .ofName(index.name())
225               .onType(ChampObject.ReservedTypes.ANY.toString())
226               .forField(index.getFieldKeys()[0].name())
227               .build();
228           return true;
229         }
230
231         next = null;
232         return false;
233       }
234
235       @Override
236       public ChampObjectIndex next() {
237         if (next == null) throw new NoSuchElementException();
238
239         return next;
240       }
241     };
242
243     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
244         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
245   }
246
247   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
248     if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
249
250     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
251   }
252
253   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
254     if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
255
256     final TitanGraph graph = getGraph();
257     final TitanManagement createIndexMgmt = graph.openManagement();
258     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
259
260     if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
261     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
262
263     createIndexMgmt.commit();
264     graph.tx().commit();
265
266     awaitIndexCreation(index.getName());
267   }
268
269   @Override
270   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
271     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
272
273     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
274     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
275
276     if (index == null) return Optional.empty();
277     if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
278
279     return Optional.of(ChampRelationshipIndex.create()
280         .ofName(indexName)
281         .onType(ChampObject.ReservedTypes.ANY.toString())
282         .forField(index.getFieldKeys()[0].name())
283         .build());
284   }
285
286   @Override
287   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
288     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
289
290     final TitanManagement createIndexMgmt = getGraph().openManagement();
291     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
292
293     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
294
295       private ChampRelationshipIndex next;
296
297       @Override
298       public boolean hasNext() {
299         if (indices.hasNext()) {
300           final TitanGraphIndex index = indices.next();
301
302           next = ChampRelationshipIndex.create()
303               .ofName(index.name())
304               .onType(ChampRelationship.ReservedTypes.ANY.toString())
305               .forField(index.getFieldKeys()[0].name())
306               .build();
307           return true;
308         }
309
310         next = null;
311         return false;
312       }
313
314       @Override
315       public ChampRelationshipIndex next() {
316         if (next == null) throw new NoSuchElementException();
317
318         return next;
319       }
320     };
321
322     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
323         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
324   }
325
326   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
327     if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
328
329     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
330   }
331
332   private Cardinality getTitanCardinality(ChampCardinality cardinality) {
333     switch (cardinality) {
334       case LIST:
335         return Cardinality.LIST;
336       case SET:
337         return Cardinality.SET;
338       case SINGLE:
339         return Cardinality.SINGLE;
340       default:
341         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
342     }
343   }
344
345   private void awaitIndexCreation(String indexName) {
346     //Wait for the index to become available
347     try {
348       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
349           .status(SchemaStatus.ENABLED)
350           .timeout(1, ChronoUnit.SECONDS)
351           .call()
352           .getSucceeded()) {
353         return; //Empty graphs immediately ENABLE indices
354       }
355
356       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
357           .status(SchemaStatus.REGISTERED)
358           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
359           .call()
360           .getSucceeded()) {
361         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
362         return;
363       }
364     } catch (InterruptedException e) {
365       LOGGER.warn("Interrupted while waiting for object index creation status");
366       return;
367     }
368
369     //Reindex the existing data
370
371     try {
372       final TitanManagement updateIndexMgmt = graph.openManagement();
373       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
374       updateIndexMgmt.commit();
375     } catch (InterruptedException e) {
376       LOGGER.warn("Interrupted while reindexing for object index");
377       return;
378     } catch (ExecutionException e) {
379       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
380     }
381
382     try {
383       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
384           .status(SchemaStatus.ENABLED)
385           .timeout(10, ChronoUnit.MINUTES)
386           .call();
387     } catch (InterruptedException e) {
388       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
389       return;
390     }
391   }
392
393   @Override
394   public ChampCapabilities capabilities() {
395     return CAPABILITIES;
396   }
397
398   @Override
399   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
400     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
401
402     final ChampSchema currentSchema = retrieveSchema();
403     final TitanManagement mgmt = getGraph().openManagement();
404
405     try {
406       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
407         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
408           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
409
410           if (currentObjConstraint.isPresent()) {
411             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
412
413             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
414               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
415             }
416           }
417
418           final String newPropertyKeyName = propConstraint.getField().getName();
419
420           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
421
422           mgmt.makePropertyKey(newPropertyKeyName)
423               .dataType(propConstraint.getField().getJavaType())
424               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
425               .make();
426         }
427       }
428
429       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
430
431         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
432
433         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
434
435           if (currentRelConstraint.isPresent()) {
436             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
437
438             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
439               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
440             }
441           }
442
443           final String newPropertyKeyName = propConstraint.getField().getName();
444
445           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
446
447           mgmt.makePropertyKey(newPropertyKeyName)
448               .dataType(propConstraint.getField().getJavaType())
449               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
450               .make();
451         }
452
453         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
454
455         if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
456             .directed()
457             .make();
458       }
459
460       mgmt.commit();
461
462       super.storeSchema(schema);
463     } catch (SchemaViolationException | ChampSchemaViolationException e) {
464       mgmt.rollback();
465       throw new ChampSchemaViolationException(e);
466     }
467   }
468
469         @Override
470         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
471                 return query.hasLabel((String) type);
472         }
473 }