2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champtitan.graph.impl;
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
26 import java.util.Map.Entry;
27 import java.util.concurrent.ExecutionException;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
31 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
32 import org.apache.tinkerpop.gremlin.structure.Edge;
33 import org.apache.tinkerpop.gremlin.structure.Vertex;
34 import org.onap.aai.champcore.ChampCapabilities;
35 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
36 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
37 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
38 import org.onap.aai.champcore.model.ChampCardinality;
39 import org.onap.aai.champcore.model.ChampField;
40 import org.onap.aai.champcore.model.ChampObject;
41 import org.onap.aai.champcore.model.ChampObjectConstraint;
42 import org.onap.aai.champcore.model.ChampObjectIndex;
43 import org.onap.aai.champcore.model.ChampPropertyConstraint;
44 import org.onap.aai.champcore.model.ChampRelationship;
45 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
46 import org.onap.aai.champcore.model.ChampRelationshipIndex;
47 import org.onap.aai.champcore.model.ChampSchema;
48 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
49 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 import com.thinkaurelius.titan.core.Cardinality;
54 import com.thinkaurelius.titan.core.EdgeLabel;
55 import com.thinkaurelius.titan.core.PropertyKey;
56 import com.thinkaurelius.titan.core.SchemaViolationException;
57 import com.thinkaurelius.titan.core.TitanEdge;
58 import com.thinkaurelius.titan.core.TitanFactory;
59 import com.thinkaurelius.titan.core.TitanGraph;
60 import com.thinkaurelius.titan.core.TitanVertex;
61 import com.thinkaurelius.titan.core.schema.SchemaAction;
62 import com.thinkaurelius.titan.core.schema.SchemaStatus;
63 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
64 import com.thinkaurelius.titan.core.schema.TitanManagement;
65 import com.thinkaurelius.titan.core.schema.TitanManagement.IndexBuilder;
66 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
68 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
70 private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
71 private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
72 private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
73 private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
74 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
75 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
77 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
80 public boolean canDeleteObjectIndices() {
85 public boolean canDeleteRelationshipIndices() {
90 private final TitanGraph graph;
92 public TitanChampGraphImpl(Builder builder) {
93 super(builder.graphConfiguration);
94 final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
96 for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
97 titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
100 titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
102 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
104 if ("cassandra".equals(storageBackend) ||
105 "cassandrathrift".equals(storageBackend) ||
106 "astyanax".equals(storageBackend) ||
107 "embeddedcassandra".equals(storageBackend)) {
108 titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
109 } else if ("hbase".equals(storageBackend)) {
110 titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
111 } else if ("berkleyje".equals(storageBackend)) {
112 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
113 } else if ("inmemory".equals(storageBackend)) {
115 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
118 LOGGER.info("Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
120 this.graph = titanGraphBuilder.open();
123 public static class Builder {
124 private final String graphName;
126 private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
128 public Builder(String graphName) {
129 this.graphName = graphName;
132 public Builder(String graphName, Map<String, Object> properties) {
133 this.graphName = graphName;
134 properties(properties);
137 public Builder properties(Map<String, Object> properties) {
138 if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
139 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
140 + " in initial configuration - this path is used"
141 + " to specify graph names");
143 this.graphConfiguration.putAll(properties);
147 public Builder property(String path, Object value) {
148 if (path.equals(TITAN_CASSANDRA_KEYSPACE))
149 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
150 + " in initial configuration - this path is used"
151 + " to specify graph names");
152 graphConfiguration.put(path, value);
156 public TitanChampGraphImpl build() {
157 return new TitanChampGraphImpl(this);
162 protected TitanGraph getGraph() {
167 protected ChampSchemaEnforcer getSchemaEnforcer() {
168 return SCHEMA_ENFORCER;
171 public void executeStoreObjectIndex(ChampObjectIndex index) {
172 if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
174 final TitanGraph graph = getGraph();
175 final TitanManagement createIndexMgmt = graph.openManagement();
177 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
178 createIndexMgmt.rollback();
179 return; //Ignore, index already exists
182 IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
183 for (ChampField field : index.getFields()) {
184 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
187 ib.buildCompositeIndex();
188 createIndexMgmt.commit();
191 awaitIndexCreation(index.getName());
195 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
196 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
198 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
199 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
201 if (index == null) return Optional.empty();
202 if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
203 List<String> fieldNames = new ArrayList<String>();
204 for (int i = 0; i < index.getFieldKeys().length; i++) {
205 fieldNames.add(index.getFieldKeys()[i].name());
208 return Optional.of(ChampObjectIndex.create()
210 .onType(ChampObject.ReservedTypes.ANY.toString())
211 .forFields(fieldNames)
216 public Stream<ChampObjectIndex> retrieveObjectIndices() {
217 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
219 final TitanManagement createIndexMgmt = getGraph().openManagement();
220 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
222 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
224 private ChampObjectIndex next;
227 public boolean hasNext() {
228 if (indices.hasNext()) {
229 final TitanGraphIndex index = indices.next();
231 List<String> fieldNames = new ArrayList<String>();
232 for (int i = 0; i < index.getFieldKeys().length; i++) {
233 fieldNames.add(index.getFieldKeys()[i].name());
236 next = ChampObjectIndex.create()
237 .ofName(index.name())
238 .onType(ChampObject.ReservedTypes.ANY.toString())
239 .forFields(fieldNames)
249 public ChampObjectIndex next() {
250 if (next == null) throw new NoSuchElementException();
256 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
257 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
260 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
261 if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
263 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
266 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
267 if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
269 final TitanGraph graph = getGraph();
270 final TitanManagement createIndexMgmt = graph.openManagement();
271 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
273 if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
274 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
276 createIndexMgmt.commit();
279 awaitIndexCreation(index.getName());
283 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
284 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
286 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
287 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
289 if (index == null) return Optional.empty();
290 if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
292 return Optional.of(ChampRelationshipIndex.create()
294 .onType(ChampObject.ReservedTypes.ANY.toString())
295 .forField(index.getFieldKeys()[0].name())
300 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
301 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
303 final TitanManagement createIndexMgmt = getGraph().openManagement();
304 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
306 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
308 private ChampRelationshipIndex next;
311 public boolean hasNext() {
312 if (indices.hasNext()) {
313 final TitanGraphIndex index = indices.next();
315 next = ChampRelationshipIndex.create()
316 .ofName(index.name())
317 .onType(ChampRelationship.ReservedTypes.ANY.toString())
318 .forField(index.getFieldKeys()[0].name())
328 public ChampRelationshipIndex next() {
329 if (next == null) throw new NoSuchElementException();
335 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
336 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
339 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
340 if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
342 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
345 private Cardinality getTitanCardinality(ChampCardinality cardinality) {
346 switch (cardinality) {
348 return Cardinality.LIST;
350 return Cardinality.SET;
352 return Cardinality.SINGLE;
354 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
358 private void awaitIndexCreation(String indexName) {
359 //Wait for the index to become available
361 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
362 .status(SchemaStatus.ENABLED)
363 .timeout(1, ChronoUnit.SECONDS)
366 return; //Empty graphs immediately ENABLE indices
369 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
370 .status(SchemaStatus.REGISTERED)
371 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
374 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
377 } catch (InterruptedException e) {
378 LOGGER.warn("Interrupted while waiting for object index creation status");
379 Thread.currentThread().interrupt();
383 //Reindex the existing data
386 final TitanManagement updateIndexMgmt = graph.openManagement();
387 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
388 updateIndexMgmt.commit();
389 } catch (InterruptedException e) {
390 LOGGER.warn("Interrupted while reindexing for object index");
391 Thread.currentThread().interrupt();
393 } catch (ExecutionException e) {
394 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
398 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
399 .status(SchemaStatus.ENABLED)
400 .timeout(10, ChronoUnit.MINUTES)
402 } catch (InterruptedException e) {
403 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
404 Thread.currentThread().interrupt();
410 public ChampCapabilities capabilities() {
415 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
416 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
418 final ChampSchema currentSchema = retrieveSchema();
419 final TitanManagement mgmt = getGraph().openManagement();
422 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
423 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
424 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
426 if (currentObjConstraint.isPresent()) {
427 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
429 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
430 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
434 final String newPropertyKeyName = propConstraint.getField().getName();
436 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
438 mgmt.makePropertyKey(newPropertyKeyName)
439 .dataType(propConstraint.getField().getJavaType())
440 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
445 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
447 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
449 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
451 if (currentRelConstraint.isPresent()) {
452 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
454 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
455 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
459 final String newPropertyKeyName = propConstraint.getField().getName();
461 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
463 mgmt.makePropertyKey(newPropertyKeyName)
464 .dataType(propConstraint.getField().getJavaType())
465 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
469 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
471 if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
478 super.storeSchema(schema);
479 } catch (SchemaViolationException | ChampSchemaViolationException e) {
481 throw new ChampSchemaViolationException(e);
486 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
487 return query.hasLabel((String) type);
491 public void createDefaultIndexes() {
492 LOGGER.error("No default indexes being created");