[AAI-80 Amsterdam] checking in source code
[aai/champ.git] / src / main / java / org / openecomp / aai / champ / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.openecomp.aai.champ.graph.impl;
23
24 import java.time.temporal.ChronoUnit;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.NoSuchElementException;
30 import java.util.Optional;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.concurrent.ExecutionException;
34 import java.util.stream.Stream;
35 import java.util.stream.StreamSupport;
36
37 import org.apache.tinkerpop.gremlin.structure.Edge;
38 import org.apache.tinkerpop.gremlin.structure.Vertex;
39 import org.openecomp.aai.champ.ChampCapabilities;
40 import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
41 import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
42 import org.openecomp.aai.champ.model.ChampCardinality;
43 import org.openecomp.aai.champ.model.ChampObject;
44 import org.openecomp.aai.champ.model.ChampObjectConstraint;
45 import org.openecomp.aai.champ.model.ChampObjectIndex;
46 import org.openecomp.aai.champ.model.ChampPropertyConstraint;
47 import org.openecomp.aai.champ.model.ChampRelationship;
48 import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
49 import org.openecomp.aai.champ.model.ChampRelationshipIndex;
50 import org.openecomp.aai.champ.model.ChampSchema;
51 import org.openecomp.aai.champ.schema.ChampSchemaEnforcer;
52 import org.openecomp.aai.champ.schema.DefaultChampSchemaEnforcer;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import com.thinkaurelius.titan.core.Cardinality;
57 import com.thinkaurelius.titan.core.EdgeLabel;
58 import com.thinkaurelius.titan.core.PropertyKey;
59 import com.thinkaurelius.titan.core.SchemaViolationException;
60 import com.thinkaurelius.titan.core.TitanEdge;
61 import com.thinkaurelius.titan.core.TitanFactory;
62 import com.thinkaurelius.titan.core.TitanGraph;
63 import com.thinkaurelius.titan.core.TitanVertex;
64 import com.thinkaurelius.titan.core.schema.SchemaAction;
65 import com.thinkaurelius.titan.core.schema.SchemaStatus;
66 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
67 import com.thinkaurelius.titan.core.schema.TitanManagement;
68 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
69
70 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
71
72         private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
73         private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
74         private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
75         private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
76         private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
77
78         private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
79
80                 @Override
81                 public boolean canDeleteObjectIndices() {
82                         return false;
83                 }
84
85                 @Override
86                 public boolean canDeleteRelationshipIndices() {
87                         return false;
88                 }
89         };
90
91         private final TitanGraph graph;
92
93         private TitanChampGraphImpl(Builder builder) {
94             super(builder.graphConfiguration);
95                 final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
96
97                 for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
98                         titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
99                 }
100
101                 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
102
103                 if (storageBackend.equals("cassandra") ||
104                         storageBackend.equals("cassandrathrift") ||
105                         storageBackend.equals("astyanax") ||
106                         storageBackend.equals("embeddedcassandra")) {
107                         titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
108                 } else if (storageBackend.equals("hbase")) {
109                         titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
110                 } else if (storageBackend.equals("berkleyje")) {
111                         throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
112                 } else if (storageBackend.equals("inmemory")) {
113                 } else {
114                         throw new RuntimeException("Unknown storage.backend=" + storageBackend);
115                 }
116                 
117                 this.graph = titanGraphBuilder.open();
118         }
119
120         public static class Builder {
121                 private final String graphName;
122
123                 private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
124
125                 public Builder(String graphName) {
126                         this.graphName = graphName;
127                 }
128
129                 public Builder properties(Map<String, Object> properties) {
130                         if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
131                                 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE 
132                                                                                                         + " in initial configuration - this path is used"
133                                                                                                         + " to specify graph names");
134
135                         this.graphConfiguration.putAll(properties);
136                         return this;
137                 }
138
139                 public Builder property(String path, Object value) {
140                         if (path.equals(TITAN_CASSANDRA_KEYSPACE))
141                                 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE 
142                                                                                                         + " in initial configuration - this path is used"
143                                                                                                         + " to specify graph names");
144                         graphConfiguration.put(path, value);
145                         return this;
146                 }
147
148                 public TitanChampGraphImpl build() {
149                         return new TitanChampGraphImpl(this);
150                 }
151         }
152
153         @Override
154         protected TitanGraph getGraph() {
155                 return graph;
156         }
157
158         @Override
159         protected ChampSchemaEnforcer getSchemaEnforcer() {
160                 return SCHEMA_ENFORCER;
161         }
162
163         public void executeStoreObjectIndex(ChampObjectIndex index) {
164                 if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
165
166                 final TitanGraph graph = getGraph();
167                 final TitanManagement createIndexMgmt = graph.openManagement();
168                 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
169
170                 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
171                         createIndexMgmt.rollback();
172                         return; //Ignore, index already exists
173                 }
174                 
175                 createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
176                 
177                 createIndexMgmt.commit();
178                 graph.tx().commit();
179                 
180                 awaitIndexCreation(index.getName());
181         }
182
183         @Override
184         public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
185                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
186
187                 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
188                 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
189
190                 if (index == null) return Optional.empty();
191                 if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
192
193                 return Optional.of(ChampObjectIndex.create()
194                                                                                         .ofName(indexName)
195                                                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
196                                                                                         .forField(index.getFieldKeys()[0].name())
197                                                                                         .build());
198         }
199
200         @Override
201         public Stream<ChampObjectIndex> retrieveObjectIndices() {
202                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
203
204                 final TitanManagement createIndexMgmt = getGraph().openManagement();
205                 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
206                 
207                 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
208         
209                         private ChampObjectIndex next;
210
211                         @Override
212                         public boolean hasNext() {
213                                 if (indices.hasNext()) {
214                                         final TitanGraphIndex index = indices.next();
215
216                                         next = ChampObjectIndex.create()
217                                                         .ofName(index.name())
218                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
219                                                         .forField(index.getFieldKeys()[0].name())
220                                                         .build();
221                                         return true;
222                                 }
223
224                                 next = null;
225                                 return false;
226                         }
227
228                         @Override
229                         public ChampObjectIndex next() {
230                                 if (next == null) throw new NoSuchElementException();
231                                 
232                                 return next;
233                         }
234                 };
235
236                 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
237                 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
238         }
239
240         public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
241                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
242
243                 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
244         }
245
246         public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
247                 if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
248
249                 final TitanGraph graph = getGraph();
250                 final TitanManagement createIndexMgmt = graph.openManagement();
251                 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
252
253                 if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
254                 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
255                 
256                 createIndexMgmt.commit();
257                 graph.tx().commit();
258
259                 awaitIndexCreation(index.getName());
260         }
261
262         @Override
263         public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
264                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
265
266                 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
267                 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
268
269                 if (index == null) return Optional.empty();
270                 if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
271
272                 return Optional.of(ChampRelationshipIndex.create()
273                                                                                         .ofName(indexName)
274                                                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
275                                                                                         .forField(index.getFieldKeys()[0].name())
276                                                                                         .build());
277         }
278
279         @Override
280         public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
281                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
282
283                 final TitanManagement createIndexMgmt = getGraph().openManagement();
284                 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
285                 
286                 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
287         
288                         private ChampRelationshipIndex next;
289
290                         @Override
291                         public boolean hasNext() {
292                                 if (indices.hasNext()) {
293                                         final TitanGraphIndex index = indices.next();
294
295                                         next = ChampRelationshipIndex.create()
296                                                         .ofName(index.name())
297                                                         .onType(ChampRelationship.ReservedTypes.ANY.toString())
298                                                         .forField(index.getFieldKeys()[0].name())
299                                                         .build();
300                                         return true;
301                                 }
302
303                                 next = null;
304                                 return false;
305                         }
306
307                         @Override
308                         public ChampRelationshipIndex next() {
309                                 if (next == null) throw new NoSuchElementException();
310                                 
311                                 return next;
312                         }
313                 };
314
315                 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
316                 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
317         }
318
319         public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
320                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
321
322                 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
323         }
324
325         private Cardinality getTitanCardinality(ChampCardinality cardinality) {
326                 switch (cardinality) {
327                 case LIST:
328                         return Cardinality.LIST;
329                 case SET:
330                         return Cardinality.SET;
331                 case SINGLE:
332                         return Cardinality.SINGLE;
333                 default:
334                         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
335                 }
336         }
337
338         private void awaitIndexCreation(String indexName) {
339         //Wait for the index to become available
340                 try {
341                         if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
342                                                         .status(SchemaStatus.ENABLED)
343                                                         .timeout(1, ChronoUnit.SECONDS)
344                                                         .call()
345                                                         .getSucceeded()) {
346                                 return; //Empty graphs immediately ENABLE indices
347                         }
348
349                         if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
350                                                         .status(SchemaStatus.REGISTERED)
351                                                         .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
352                                                         .call()
353                                                         .getSucceeded()) {
354                                 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
355                                 return;
356                         }
357                 } catch (InterruptedException e) {
358                         LOGGER.warn("Interrupted while waiting for object index creation status");
359                         return;
360                 }
361
362                 //Reindex the existing data
363
364                 try {
365                         final TitanManagement updateIndexMgmt = graph.openManagement();
366                         updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
367                         updateIndexMgmt.commit();
368                 } catch (InterruptedException e) {
369                         LOGGER.warn("Interrupted while reindexing for object index");
370                         return;
371                 } catch (ExecutionException e) {
372                         LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
373                 }
374                 
375                 try {
376                         ManagementSystem.awaitGraphIndexStatus(graph, indexName)
377                                                         .status(SchemaStatus.ENABLED)
378                                                         .timeout(10, ChronoUnit.MINUTES)
379                                                         .call();
380                 } catch (InterruptedException e) {
381                         LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
382                         return;
383                 }
384         }
385
386         @Override
387         public ChampCapabilities capabilities() {
388                 return CAPABILITIES;
389         }
390
391         @Override
392         public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
393                 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
394
395                 final ChampSchema currentSchema = retrieveSchema();
396                 final TitanManagement mgmt = getGraph().openManagement();
397
398                 try {
399                         for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
400                                 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
401                                         final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
402                                         
403                                         if (currentObjConstraint.isPresent()) {
404                                                 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
405
406                                                 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
407                                                         throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
408                                                 }
409                                         }
410
411                                         final String newPropertyKeyName = propConstraint.getField().getName();
412
413                                         if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
414                                         
415                                         mgmt.makePropertyKey(newPropertyKeyName)
416                                                 .dataType(propConstraint.getField().getJavaType())
417                                                 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
418                                                 .make();
419                                 }
420                         }
421
422                         for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
423
424                                 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
425
426                                 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
427         
428                                         if (currentRelConstraint.isPresent()) {
429                                                 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
430
431                                                 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
432                                                         throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
433                                                 }
434                                         }
435
436                                         final String newPropertyKeyName = propConstraint.getField().getName();
437
438                                         if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
439
440                                         mgmt.makePropertyKey(newPropertyKeyName)
441                                                         .dataType(propConstraint.getField().getJavaType())
442                                                         .cardinality(getTitanCardinality(propConstraint.getCardinality()))
443                                                         .make();
444                                 }
445
446                                 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
447
448                                 if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
449                                                                                         .directed()
450                                                                                         .make();
451                         }
452
453                         mgmt.commit();
454
455                         super.storeSchema(schema);
456                 } catch (SchemaViolationException | ChampSchemaViolationException e) {
457                         mgmt.rollback();
458                         throw new ChampSchemaViolationException(e);
459                 }
460         }
461 }