Fix handling of InterruptedException in AAI-Champ
[aai/champ.git] / champ-lib / champ-titan / src / main / java / org / onap / aai / champtitan / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champtitan.graph.impl;
22
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
25 import java.util.*;
26 import java.util.Map.Entry;
27 import java.util.concurrent.ExecutionException;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
30
31 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
32 import org.apache.tinkerpop.gremlin.structure.Edge;
33 import org.apache.tinkerpop.gremlin.structure.Vertex;
34 import org.onap.aai.champcore.ChampCapabilities;
35 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
36 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
37 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
38 import org.onap.aai.champcore.model.ChampCardinality;
39 import org.onap.aai.champcore.model.ChampObject;
40 import org.onap.aai.champcore.model.ChampObjectConstraint;
41 import org.onap.aai.champcore.model.ChampObjectIndex;
42 import org.onap.aai.champcore.model.ChampPropertyConstraint;
43 import org.onap.aai.champcore.model.ChampRelationship;
44 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
45 import org.onap.aai.champcore.model.ChampRelationshipIndex;
46 import org.onap.aai.champcore.model.ChampSchema;
47 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
48 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 import com.thinkaurelius.titan.core.Cardinality;
53 import com.thinkaurelius.titan.core.EdgeLabel;
54 import com.thinkaurelius.titan.core.PropertyKey;
55 import com.thinkaurelius.titan.core.SchemaViolationException;
56 import com.thinkaurelius.titan.core.TitanEdge;
57 import com.thinkaurelius.titan.core.TitanFactory;
58 import com.thinkaurelius.titan.core.TitanGraph;
59 import com.thinkaurelius.titan.core.TitanVertex;
60 import com.thinkaurelius.titan.core.schema.SchemaAction;
61 import com.thinkaurelius.titan.core.schema.SchemaStatus;
62 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
63 import com.thinkaurelius.titan.core.schema.TitanManagement;
64 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
65
66 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
67
68   private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
69   private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
70   private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
71   private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
72   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
73   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
74
75   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
76
77     @Override
78     public boolean canDeleteObjectIndices() {
79       return false;
80     }
81
82     @Override
83     public boolean canDeleteRelationshipIndices() {
84       return false;
85     }
86   };
87
88   private final TitanGraph graph;
89
90   public TitanChampGraphImpl(Builder builder) {
91     super(builder.graphConfiguration);
92     final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
93
94     for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
95       titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
96     }
97
98     titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
99     
100     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
101
102     if ("cassandra".equals(storageBackend) ||
103         "cassandrathrift".equals(storageBackend) ||
104         "astyanax".equals(storageBackend) ||
105         "embeddedcassandra".equals(storageBackend)) {
106       titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
107     } else if ("hbase".equals(storageBackend)) {
108       titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
109     } else if ("berkleyje".equals(storageBackend)) {
110       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
111     } else if ("inmemory".equals(storageBackend)) {
112     } else {
113       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
114     }
115     
116     LOGGER.info("Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
117
118     this.graph = titanGraphBuilder.open();
119   }
120
121   public static class Builder {
122     private final String graphName;
123
124     private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
125
126     public Builder(String graphName) {
127       this.graphName = graphName;
128     }
129     
130     public Builder(String graphName, Map<String, Object> properties) {
131         this.graphName = graphName;
132         properties(properties);
133     }
134
135     public Builder properties(Map<String, Object> properties) {
136       if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
137         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
138             + " in initial configuration - this path is used"
139             + " to specify graph names");
140
141       this.graphConfiguration.putAll(properties);
142       return this;
143     }
144
145     public Builder property(String path, Object value) {
146       if (path.equals(TITAN_CASSANDRA_KEYSPACE))
147         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
148             + " in initial configuration - this path is used"
149             + " to specify graph names");
150       graphConfiguration.put(path, value);
151       return this;
152     }
153
154     public TitanChampGraphImpl build() {
155       return new TitanChampGraphImpl(this);
156     }
157   }
158
159   @Override
160   protected TitanGraph getGraph() {
161     return graph;
162   }
163
164   @Override
165   protected ChampSchemaEnforcer getSchemaEnforcer() {
166     return SCHEMA_ENFORCER;
167   }
168
169   public void executeStoreObjectIndex(ChampObjectIndex index) {
170     if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
171
172     final TitanGraph graph = getGraph();
173     final TitanManagement createIndexMgmt = graph.openManagement();
174     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
175
176     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
177       createIndexMgmt.rollback();
178       return; //Ignore, index already exists
179     }
180
181     createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
182
183     createIndexMgmt.commit();
184     graph.tx().commit();
185
186     awaitIndexCreation(index.getName());
187   }
188
189   @Override
190   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
191     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
192
193     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
194     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
195
196     if (index == null) return Optional.empty();
197     if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
198
199     return Optional.of(ChampObjectIndex.create()
200         .ofName(indexName)
201         .onType(ChampObject.ReservedTypes.ANY.toString())
202         .forField(index.getFieldKeys()[0].name())
203         .build());
204   }
205
206   @Override
207   public Stream<ChampObjectIndex> retrieveObjectIndices() {
208     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
209
210     final TitanManagement createIndexMgmt = getGraph().openManagement();
211     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
212
213     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
214
215       private ChampObjectIndex next;
216
217       @Override
218       public boolean hasNext() {
219         if (indices.hasNext()) {
220           final TitanGraphIndex index = indices.next();
221
222           next = ChampObjectIndex.create()
223               .ofName(index.name())
224               .onType(ChampObject.ReservedTypes.ANY.toString())
225               .forField(index.getFieldKeys()[0].name())
226               .build();
227           return true;
228         }
229
230         next = null;
231         return false;
232       }
233
234       @Override
235       public ChampObjectIndex next() {
236         if (next == null) throw new NoSuchElementException();
237
238         return next;
239       }
240     };
241
242     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
243         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
244   }
245
246   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
247     if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
248
249     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
250   }
251
252   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
253     if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
254
255     final TitanGraph graph = getGraph();
256     final TitanManagement createIndexMgmt = graph.openManagement();
257     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
258
259     if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
260     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
261
262     createIndexMgmt.commit();
263     graph.tx().commit();
264
265     awaitIndexCreation(index.getName());
266   }
267
268   @Override
269   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
270     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
271
272     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
273     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
274
275     if (index == null) return Optional.empty();
276     if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
277
278     return Optional.of(ChampRelationshipIndex.create()
279         .ofName(indexName)
280         .onType(ChampObject.ReservedTypes.ANY.toString())
281         .forField(index.getFieldKeys()[0].name())
282         .build());
283   }
284
285   @Override
286   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
287     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
288
289     final TitanManagement createIndexMgmt = getGraph().openManagement();
290     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
291
292     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
293
294       private ChampRelationshipIndex next;
295
296       @Override
297       public boolean hasNext() {
298         if (indices.hasNext()) {
299           final TitanGraphIndex index = indices.next();
300
301           next = ChampRelationshipIndex.create()
302               .ofName(index.name())
303               .onType(ChampRelationship.ReservedTypes.ANY.toString())
304               .forField(index.getFieldKeys()[0].name())
305               .build();
306           return true;
307         }
308
309         next = null;
310         return false;
311       }
312
313       @Override
314       public ChampRelationshipIndex next() {
315         if (next == null) throw new NoSuchElementException();
316
317         return next;
318       }
319     };
320
321     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
322         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
323   }
324
325   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
326     if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
327
328     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
329   }
330
331   private Cardinality getTitanCardinality(ChampCardinality cardinality) {
332     switch (cardinality) {
333       case LIST:
334         return Cardinality.LIST;
335       case SET:
336         return Cardinality.SET;
337       case SINGLE:
338         return Cardinality.SINGLE;
339       default:
340         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
341     }
342   }
343
344   private void awaitIndexCreation(String indexName) {
345     //Wait for the index to become available
346     try {
347       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
348           .status(SchemaStatus.ENABLED)
349           .timeout(1, ChronoUnit.SECONDS)
350           .call()
351           .getSucceeded()) {
352         return; //Empty graphs immediately ENABLE indices
353       }
354
355       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
356           .status(SchemaStatus.REGISTERED)
357           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
358           .call()
359           .getSucceeded()) {
360         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
361         return;
362       }
363     } catch (InterruptedException e) {
364       LOGGER.warn("Interrupted while waiting for object index creation status");
365       Thread.currentThread().interrupt();
366       return;
367     }
368
369     //Reindex the existing data
370
371     try {
372       final TitanManagement updateIndexMgmt = graph.openManagement();
373       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
374       updateIndexMgmt.commit();
375     } catch (InterruptedException e) {
376       LOGGER.warn("Interrupted while reindexing for object index");
377       Thread.currentThread().interrupt();
378       return;
379     } catch (ExecutionException e) {
380       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
381     }
382
383     try {
384       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
385           .status(SchemaStatus.ENABLED)
386           .timeout(10, ChronoUnit.MINUTES)
387           .call();
388     } catch (InterruptedException e) {
389       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
390       Thread.currentThread().interrupt();
391       return;
392     }
393   }
394
395   @Override
396   public ChampCapabilities capabilities() {
397     return CAPABILITIES;
398   }
399
400   @Override
401   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
402     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
403
404     final ChampSchema currentSchema = retrieveSchema();
405     final TitanManagement mgmt = getGraph().openManagement();
406
407     try {
408       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
409         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
410           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
411
412           if (currentObjConstraint.isPresent()) {
413             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
414
415             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
416               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
417             }
418           }
419
420           final String newPropertyKeyName = propConstraint.getField().getName();
421
422           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
423
424           mgmt.makePropertyKey(newPropertyKeyName)
425               .dataType(propConstraint.getField().getJavaType())
426               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
427               .make();
428         }
429       }
430
431       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
432
433         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
434
435         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
436
437           if (currentRelConstraint.isPresent()) {
438             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
439
440             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
441               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
442             }
443           }
444
445           final String newPropertyKeyName = propConstraint.getField().getName();
446
447           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
448
449           mgmt.makePropertyKey(newPropertyKeyName)
450               .dataType(propConstraint.getField().getJavaType())
451               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
452               .make();
453         }
454
455         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
456
457         if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
458             .directed()
459             .make();
460       }
461
462       mgmt.commit();
463
464       super.storeSchema(schema);
465     } catch (SchemaViolationException | ChampSchemaViolationException e) {
466       mgmt.rollback();
467       throw new ChampSchemaViolationException(e);
468     }
469   }
470
471         @Override
472         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
473                 return query.hasLabel((String) type);
474         }
475 }