2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champtitan.graph.impl;
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
30 import java.util.Map.Entry;
31 import java.util.NoSuchElementException;
32 import java.util.Optional;
33 import java.util.Spliterator;
34 import java.util.Spliterators;
35 import java.util.concurrent.ExecutionException;
36 import java.util.stream.Stream;
37 import java.util.stream.StreamSupport;
39 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
40 import org.apache.tinkerpop.gremlin.structure.Edge;
41 import org.apache.tinkerpop.gremlin.structure.Vertex;
42 import org.onap.aai.champcore.ChampCapabilities;
43 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
46 import org.onap.aai.champcore.model.ChampCardinality;
47 import org.onap.aai.champcore.model.ChampField;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPropertyConstraint;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
57 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
58 import org.onap.aai.cl.api.Logger;
59 import org.onap.aai.cl.eelf.LoggerFactory;
61 import com.thinkaurelius.titan.core.Cardinality;
62 import com.thinkaurelius.titan.core.EdgeLabel;
63 import com.thinkaurelius.titan.core.PropertyKey;
64 import com.thinkaurelius.titan.core.SchemaViolationException;
65 import com.thinkaurelius.titan.core.TitanEdge;
66 import com.thinkaurelius.titan.core.TitanFactory;
67 import com.thinkaurelius.titan.core.TitanGraph;
68 import com.thinkaurelius.titan.core.TitanVertex;
69 import com.thinkaurelius.titan.core.schema.SchemaAction;
70 import com.thinkaurelius.titan.core.schema.SchemaStatus;
71 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
72 import com.thinkaurelius.titan.core.schema.TitanManagement;
73 import com.thinkaurelius.titan.core.schema.TitanManagement.IndexBuilder;
74 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
76 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
78 private static final Logger LOGGER = LoggerFactory.getInstance().getLogger(TitanChampGraphImpl.class);
79 private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
80 private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
81 private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
82 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
83 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
85 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
88 public boolean canDeleteObjectIndices() {
93 public boolean canDeleteRelationshipIndices() {
98 private final TitanGraph graph;
100 public TitanChampGraphImpl(Builder builder) {
101 super(builder.graphConfiguration);
102 final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
104 for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
105 titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
108 titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
110 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
112 if ("cassandra".equals(storageBackend) ||
113 "cassandrathrift".equals(storageBackend) ||
114 "astyanax".equals(storageBackend) ||
115 "embeddedcassandra".equals(storageBackend)) {
116 titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
117 } else if ("hbase".equals(storageBackend)) {
118 titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
119 } else if ("berkleyje".equals(storageBackend)) {
120 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
121 } else if ("inmemory".equals(storageBackend)) {
123 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
126 LOGGER.info(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_INFO,
127 "Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
129 this.graph = titanGraphBuilder.open();
132 public static class Builder {
133 private final String graphName;
135 private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
137 public Builder(String graphName) {
138 this.graphName = graphName;
141 public Builder(String graphName, Map<String, Object> properties) {
142 this.graphName = graphName;
143 properties(properties);
146 public Builder properties(Map<String, Object> properties) {
147 if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
148 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
149 + " in initial configuration - this path is used"
150 + " to specify graph names");
152 this.graphConfiguration.putAll(properties);
156 public Builder property(String path, Object value) {
157 if (path.equals(TITAN_CASSANDRA_KEYSPACE))
158 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
159 + " in initial configuration - this path is used"
160 + " to specify graph names");
161 graphConfiguration.put(path, value);
165 public TitanChampGraphImpl build() {
166 return new TitanChampGraphImpl(this);
171 protected TitanGraph getGraph() {
176 protected ChampSchemaEnforcer getSchemaEnforcer() {
177 return SCHEMA_ENFORCER;
180 public void executeStoreObjectIndex(ChampObjectIndex index) {
181 if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
183 final TitanGraph graph = getGraph();
184 final TitanManagement createIndexMgmt = graph.openManagement();
186 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
187 createIndexMgmt.rollback();
188 return; //Ignore, index already exists
191 IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
192 for (ChampField field : index.getFields()) {
193 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
196 ib.buildCompositeIndex();
197 createIndexMgmt.commit();
200 awaitIndexCreation(index.getName());
204 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
205 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
207 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
208 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
210 if (index == null) return Optional.empty();
211 if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
212 List<String> fieldNames = new ArrayList<String>();
213 for (int i = 0; i < index.getFieldKeys().length; i++) {
214 fieldNames.add(index.getFieldKeys()[i].name());
217 return Optional.of(ChampObjectIndex.create()
219 .onType(ChampObject.ReservedTypes.ANY.toString())
220 .forFields(fieldNames)
225 public Stream<ChampObjectIndex> retrieveObjectIndices() {
226 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
228 final TitanManagement createIndexMgmt = getGraph().openManagement();
229 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
231 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
233 private ChampObjectIndex next;
236 public boolean hasNext() {
237 if (indices.hasNext()) {
238 final TitanGraphIndex index = indices.next();
240 List<String> fieldNames = new ArrayList<String>();
241 for (int i = 0; i < index.getFieldKeys().length; i++) {
242 fieldNames.add(index.getFieldKeys()[i].name());
245 next = ChampObjectIndex.create()
246 .ofName(index.name())
247 .onType(ChampObject.ReservedTypes.ANY.toString())
248 .forFields(fieldNames)
258 public ChampObjectIndex next() {
259 if (next == null) throw new NoSuchElementException();
265 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
266 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
269 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
270 if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
272 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
275 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
276 if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
278 final TitanGraph graph = getGraph();
279 final TitanManagement createIndexMgmt = graph.openManagement();
280 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
282 if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
283 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
285 createIndexMgmt.commit();
288 awaitIndexCreation(index.getName());
292 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
293 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
295 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
296 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
298 if (index == null) return Optional.empty();
299 if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
301 return Optional.of(ChampRelationshipIndex.create()
303 .onType(ChampObject.ReservedTypes.ANY.toString())
304 .forField(index.getFieldKeys()[0].name())
309 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
310 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
312 final TitanManagement createIndexMgmt = getGraph().openManagement();
313 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
315 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
317 private ChampRelationshipIndex next;
320 public boolean hasNext() {
321 if (indices.hasNext()) {
322 final TitanGraphIndex index = indices.next();
324 next = ChampRelationshipIndex.create()
325 .ofName(index.name())
326 .onType(ChampRelationship.ReservedTypes.ANY.toString())
327 .forField(index.getFieldKeys()[0].name())
337 public ChampRelationshipIndex next() {
338 if (next == null) throw new NoSuchElementException();
344 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
345 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
348 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
349 if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
351 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
354 private Cardinality getTitanCardinality(ChampCardinality cardinality) {
355 switch (cardinality) {
357 return Cardinality.LIST;
359 return Cardinality.SET;
361 return Cardinality.SINGLE;
363 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
367 private void awaitIndexCreation(String indexName) {
368 //Wait for the index to become available
370 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
371 .status(SchemaStatus.ENABLED)
372 .timeout(1, ChronoUnit.SECONDS)
375 return; //Empty graphs immediately ENABLE indices
378 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
379 .status(SchemaStatus.REGISTERED)
380 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
383 LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
384 "Object index was created, but timed out while waiting for it to be registered");
387 } catch (InterruptedException e) {
388 LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
389 "Interrupted while waiting for object index creation status");
390 Thread.currentThread().interrupt();
394 //Reindex the existing data
397 final TitanManagement updateIndexMgmt = graph.openManagement();
398 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
399 updateIndexMgmt.commit();
400 } catch (InterruptedException e) {
401 LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
402 "Interrupted while reindexing for object index");
403 Thread.currentThread().interrupt();
405 } catch (ExecutionException e) {
406 LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
407 "Exception occurred during reindexing procedure for creating object index " + indexName + ". " + e.getMessage());
411 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
412 .status(SchemaStatus.ENABLED)
413 .timeout(10, ChronoUnit.MINUTES)
415 } catch (InterruptedException e) {
416 LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
417 "Interrupted while waiting for index to transition to ENABLED state");
418 Thread.currentThread().interrupt();
424 public ChampCapabilities capabilities() {
429 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
430 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
432 final ChampSchema currentSchema = retrieveSchema();
433 final TitanManagement mgmt = getGraph().openManagement();
436 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
437 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
438 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
440 if (currentObjConstraint.isPresent()) {
441 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
443 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
444 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
448 final String newPropertyKeyName = propConstraint.getField().getName();
450 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
452 mgmt.makePropertyKey(newPropertyKeyName)
453 .dataType(propConstraint.getField().getJavaType())
454 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
459 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
461 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
463 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
465 if (currentRelConstraint.isPresent()) {
466 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
468 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
469 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
473 final String newPropertyKeyName = propConstraint.getField().getName();
475 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
477 mgmt.makePropertyKey(newPropertyKeyName)
478 .dataType(propConstraint.getField().getJavaType())
479 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
483 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
485 if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
492 super.storeSchema(schema);
493 } catch (SchemaViolationException | ChampSchemaViolationException e) {
495 throw new ChampSchemaViolationException(e);
500 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
501 return query.hasLabel((String) type);
505 public void createDefaultIndexes() {
506 LOGGER.error(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_ERROR,
507 "No default indexes being created");