Renaming openecomp to onap
[aai/champ.git] / src / main / java / org / onap / aai / champ / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.graph.impl;
23
24 import com.thinkaurelius.titan.core.Cardinality;
25 import com.thinkaurelius.titan.core.EdgeLabel;
26 import com.thinkaurelius.titan.core.PropertyKey;
27 import com.thinkaurelius.titan.core.SchemaViolationException;
28 import com.thinkaurelius.titan.core.TitanEdge;
29 import com.thinkaurelius.titan.core.TitanFactory;
30 import com.thinkaurelius.titan.core.TitanGraph;
31 import com.thinkaurelius.titan.core.TitanVertex;
32 import com.thinkaurelius.titan.core.schema.SchemaAction;
33 import com.thinkaurelius.titan.core.schema.SchemaStatus;
34 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
35 import com.thinkaurelius.titan.core.schema.TitanManagement;
36 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
37 import org.apache.tinkerpop.gremlin.structure.Edge;
38 import org.apache.tinkerpop.gremlin.structure.Vertex;
39 import org.onap.aai.champ.ChampCapabilities;
40 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
42 import org.onap.aai.champ.model.ChampCardinality;
43 import org.onap.aai.champ.model.ChampObject;
44 import org.onap.aai.champ.model.ChampObjectConstraint;
45 import org.onap.aai.champ.model.ChampObjectIndex;
46 import org.onap.aai.champ.model.ChampPropertyConstraint;
47 import org.onap.aai.champ.model.ChampRelationship;
48 import org.onap.aai.champ.model.ChampRelationshipConstraint;
49 import org.onap.aai.champ.model.ChampRelationshipIndex;
50 import org.onap.aai.champ.model.ChampSchema;
51 import org.onap.aai.champ.schema.ChampSchemaEnforcer;
52 import org.onap.aai.champ.schema.DefaultChampSchemaEnforcer;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.time.temporal.ChronoUnit;
57 import java.util.HashMap;
58 import java.util.Iterator;
59 import java.util.Map;
60 import java.util.Map.Entry;
61 import java.util.NoSuchElementException;
62 import java.util.Optional;
63 import java.util.Spliterator;
64 import java.util.Spliterators;
65 import java.util.concurrent.ExecutionException;
66 import java.util.stream.Stream;
67 import java.util.stream.StreamSupport;
68
69 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
70
71         private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
72         private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
73         private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
74         private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
75         private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
76
77         private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
78
79                 @Override
80                 public boolean canDeleteObjectIndices() {
81                         return false;
82                 }
83
84                 @Override
85                 public boolean canDeleteRelationshipIndices() {
86                         return false;
87                 }
88         };
89
90         private final TitanGraph graph;
91
92         private TitanChampGraphImpl(Builder builder) {
93             super(builder.graphConfiguration);
94                 final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
95
96                 for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
97                         titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
98                 }
99
100                 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
101
102                 if ("cassandra".equals(storageBackend) || "cassandrathrift".equals(storageBackend)
103                                 || "astyanax".equals(storageBackend) || "embeddedcassandra".equals(storageBackend)) {
104                         titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
105                 } else if ("hbase".equals(storageBackend)) {
106                         titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
107                 } else if ("berkleyje".equals(storageBackend)) {
108                         throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
109                 } else if ("inmemory".equals(storageBackend)) {
110                 } else {
111                         throw new RuntimeException("Unknown storage.backend=" + storageBackend);
112                 }
113                 
114                 this.graph = titanGraphBuilder.open();
115         }
116
117         public static class Builder {
118                 private final String graphName;
119
120                 private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
121
122                 public Builder(String graphName) {
123                         this.graphName = graphName;
124                 }
125
126                 public Builder properties(Map<String, Object> properties) {
127                         if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
128                                 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE 
129                                                                                                         + " in initial configuration - this path is used"
130                                                                                                         + " to specify graph names");
131
132                         this.graphConfiguration.putAll(properties);
133                         return this;
134                 }
135
136                 public Builder property(String path, Object value) {
137                         if (path.equals(TITAN_CASSANDRA_KEYSPACE))
138                                 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE 
139                                                                                                         + " in initial configuration - this path is used"
140                                                                                                         + " to specify graph names");
141                         graphConfiguration.put(path, value);
142                         return this;
143                 }
144
145                 public TitanChampGraphImpl build() {
146                         return new TitanChampGraphImpl(this);
147                 }
148         }
149
150         @Override
151         protected TitanGraph getGraph() {
152                 return graph;
153         }
154
155         @Override
156         protected ChampSchemaEnforcer getSchemaEnforcer() {
157                 return SCHEMA_ENFORCER;
158         }
159
160         public void executeStoreObjectIndex(ChampObjectIndex index) {
161                 if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
162
163                 final TitanGraph graph = getGraph();
164                 final TitanManagement createIndexMgmt = graph.openManagement();
165                 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
166
167                 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
168                         createIndexMgmt.rollback();
169                         return; //Ignore, index already exists
170                 }
171                 
172                 createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
173                 
174                 createIndexMgmt.commit();
175                 graph.tx().commit();
176                 
177                 awaitIndexCreation(index.getName());
178         }
179
180         @Override
181         public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
182                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
183
184                 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
185                 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
186
187                 if (index == null) return Optional.empty();
188                 if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
189
190                 return Optional.of(ChampObjectIndex.create()
191                                                                                         .ofName(indexName)
192                                                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
193                                                                                         .forField(index.getFieldKeys()[0].name())
194                                                                                         .build());
195         }
196
197         @Override
198         public Stream<ChampObjectIndex> retrieveObjectIndices() {
199                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
200
201                 final TitanManagement createIndexMgmt = getGraph().openManagement();
202                 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
203                 
204                 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
205         
206                         private ChampObjectIndex next;
207
208                         @Override
209                         public boolean hasNext() {
210                                 if (indices.hasNext()) {
211                                         final TitanGraphIndex index = indices.next();
212
213                                         next = ChampObjectIndex.create()
214                                                         .ofName(index.name())
215                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
216                                                         .forField(index.getFieldKeys()[0].name())
217                                                         .build();
218                                         return true;
219                                 }
220
221                                 next = null;
222                                 return false;
223                         }
224
225                         @Override
226                         public ChampObjectIndex next() {
227                                 if (next == null) throw new NoSuchElementException();
228                                 
229                                 return next;
230                         }
231                 };
232
233                 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
234                 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
235         }
236
237         public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
238                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
239
240                 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
241         }
242
243         public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
244                 if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
245
246                 final TitanGraph graph = getGraph();
247                 final TitanManagement createIndexMgmt = graph.openManagement();
248                 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
249
250                 if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
251                 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
252                 
253                 createIndexMgmt.commit();
254                 graph.tx().commit();
255
256                 awaitIndexCreation(index.getName());
257         }
258
259         @Override
260         public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
261                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
262
263                 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
264                 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
265
266                 if (index == null) return Optional.empty();
267                 if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
268
269                 return Optional.of(ChampRelationshipIndex.create()
270                                                                                         .ofName(indexName)
271                                                                                         .onType(ChampObject.ReservedTypes.ANY.toString())
272                                                                                         .forField(index.getFieldKeys()[0].name())
273                                                                                         .build());
274         }
275
276         @Override
277         public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
278                 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
279
280                 final TitanManagement createIndexMgmt = getGraph().openManagement();
281                 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
282                 
283                 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
284         
285                         private ChampRelationshipIndex next;
286
287                         @Override
288                         public boolean hasNext() {
289                                 if (indices.hasNext()) {
290                                         final TitanGraphIndex index = indices.next();
291
292                                         next = ChampRelationshipIndex.create()
293                                                         .ofName(index.name())
294                                                         .onType(ChampRelationship.ReservedTypes.ANY.toString())
295                                                         .forField(index.getFieldKeys()[0].name())
296                                                         .build();
297                                         return true;
298                                 }
299
300                                 next = null;
301                                 return false;
302                         }
303
304                         @Override
305                         public ChampRelationshipIndex next() {
306                                 if (next == null) throw new NoSuchElementException();
307                                 
308                                 return next;
309                         }
310                 };
311
312                 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
313                 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
314         }
315
316         public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
317                 if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
318
319                 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
320         }
321
322         private Cardinality getTitanCardinality(ChampCardinality cardinality) {
323                 switch (cardinality) {
324                 case LIST:
325                         return Cardinality.LIST;
326                 case SET:
327                         return Cardinality.SET;
328                 case SINGLE:
329                         return Cardinality.SINGLE;
330                 default:
331                         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
332                 }
333         }
334
335         private void awaitIndexCreation(String indexName) {
336         //Wait for the index to become available
337                 try {
338                         if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
339                                                         .status(SchemaStatus.ENABLED)
340                                                         .timeout(1, ChronoUnit.SECONDS)
341                                                         .call()
342                                                         .getSucceeded()) {
343                                 return; //Empty graphs immediately ENABLE indices
344                         }
345
346                         if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
347                                                         .status(SchemaStatus.REGISTERED)
348                                                         .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
349                                                         .call()
350                                                         .getSucceeded()) {
351                                 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
352                                 return;
353                         }
354                 } catch (InterruptedException e) {
355                         LOGGER.warn("Interrupted while waiting for object index creation status");
356                         return;
357                 }
358
359                 //Reindex the existing data
360
361                 try {
362                         final TitanManagement updateIndexMgmt = graph.openManagement();
363                         updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
364                         updateIndexMgmt.commit();
365                 } catch (InterruptedException e) {
366                         LOGGER.warn("Interrupted while reindexing for object index");
367                         return;
368                 } catch (ExecutionException e) {
369                         LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
370                 }
371                 
372                 try {
373                         ManagementSystem.awaitGraphIndexStatus(graph, indexName)
374                                                         .status(SchemaStatus.ENABLED)
375                                                         .timeout(10, ChronoUnit.MINUTES)
376                                                         .call();
377                 } catch (InterruptedException e) {
378                         LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
379                         return;
380                 }
381         }
382
383         @Override
384         public ChampCapabilities capabilities() {
385                 return CAPABILITIES;
386         }
387
388         @Override
389         public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
390                 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
391
392                 final ChampSchema currentSchema = retrieveSchema();
393                 final TitanManagement mgmt = getGraph().openManagement();
394
395                 try {
396                         for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
397                                 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
398                                         final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
399                                         
400                                         if (currentObjConstraint.isPresent()) {
401                                                 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
402
403                                                 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
404                                                         throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
405                                                 }
406                                         }
407
408                                         final String newPropertyKeyName = propConstraint.getField().getName();
409
410                                         if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
411                                         
412                                         mgmt.makePropertyKey(newPropertyKeyName)
413                                                 .dataType(propConstraint.getField().getJavaType())
414                                                 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
415                                                 .make();
416                                 }
417                         }
418
419                         for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
420
421                                 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
422
423                                 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
424         
425                                         if (currentRelConstraint.isPresent()) {
426                                                 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
427
428                                                 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
429                                                         throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
430                                                 }
431                                         }
432
433                                         final String newPropertyKeyName = propConstraint.getField().getName();
434
435                                         if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
436
437                                         mgmt.makePropertyKey(newPropertyKeyName)
438                                                         .dataType(propConstraint.getField().getJavaType())
439                                                         .cardinality(getTitanCardinality(propConstraint.getCardinality()))
440                                                         .make();
441                                 }
442
443                                 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
444
445                                 if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
446                                                                                         .directed()
447                                                                                         .make();
448                         }
449
450                         mgmt.commit();
451
452                         super.storeSchema(schema);
453                 } catch (SchemaViolationException | ChampSchemaViolationException e) {
454                         mgmt.rollback();
455                         throw new ChampSchemaViolationException(e);
456                 }
457         }
458 }