2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.openecomp.aai.champ.graph.impl;
24 import java.time.temporal.ChronoUnit;
25 import java.util.HashMap;
26 import java.util.Iterator;
28 import java.util.Map.Entry;
29 import java.util.NoSuchElementException;
30 import java.util.Optional;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.concurrent.ExecutionException;
34 import java.util.stream.Stream;
35 import java.util.stream.StreamSupport;
37 import org.apache.tinkerpop.gremlin.structure.Edge;
38 import org.apache.tinkerpop.gremlin.structure.Vertex;
39 import org.openecomp.aai.champ.ChampCapabilities;
40 import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
41 import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
42 import org.openecomp.aai.champ.model.ChampCardinality;
43 import org.openecomp.aai.champ.model.ChampObject;
44 import org.openecomp.aai.champ.model.ChampObjectConstraint;
45 import org.openecomp.aai.champ.model.ChampObjectIndex;
46 import org.openecomp.aai.champ.model.ChampPropertyConstraint;
47 import org.openecomp.aai.champ.model.ChampRelationship;
48 import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
49 import org.openecomp.aai.champ.model.ChampRelationshipIndex;
50 import org.openecomp.aai.champ.model.ChampSchema;
51 import org.openecomp.aai.champ.schema.ChampSchemaEnforcer;
52 import org.openecomp.aai.champ.schema.DefaultChampSchemaEnforcer;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import com.thinkaurelius.titan.core.Cardinality;
57 import com.thinkaurelius.titan.core.EdgeLabel;
58 import com.thinkaurelius.titan.core.PropertyKey;
59 import com.thinkaurelius.titan.core.SchemaViolationException;
60 import com.thinkaurelius.titan.core.TitanEdge;
61 import com.thinkaurelius.titan.core.TitanFactory;
62 import com.thinkaurelius.titan.core.TitanGraph;
63 import com.thinkaurelius.titan.core.TitanVertex;
64 import com.thinkaurelius.titan.core.schema.SchemaAction;
65 import com.thinkaurelius.titan.core.schema.SchemaStatus;
66 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
67 import com.thinkaurelius.titan.core.schema.TitanManagement;
68 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
70 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
72 private static final Logger LOGGER = LoggerFactory.getLogger(TitanChampGraphImpl.class);
73 private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
74 private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
75 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
76 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
78 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
81 public boolean canDeleteObjectIndices() {
86 public boolean canDeleteRelationshipIndices() {
91 private final TitanGraph graph;
93 private TitanChampGraphImpl(Builder builder) {
94 super(builder.graphConfiguration);
95 final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
97 for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
98 titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
101 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
103 if (storageBackend.equals("cassandra") ||
104 storageBackend.equals("cassandrathrift") ||
105 storageBackend.equals("astyanax") ||
106 storageBackend.equals("embeddedcassandra")) {
107 titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
108 } else if (storageBackend.equals("hbase")) {
109 titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
110 } else if (storageBackend.equals("berkleyje")) {
111 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
112 } else if (storageBackend.equals("inmemory")) {
114 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
117 this.graph = titanGraphBuilder.open();
120 public static class Builder {
121 private final String graphName;
123 private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
125 public Builder(String graphName) {
126 this.graphName = graphName;
129 public Builder properties(Map<String, Object> properties) {
130 if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
131 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
132 + " in initial configuration - this path is used"
133 + " to specify graph names");
135 this.graphConfiguration.putAll(properties);
139 public Builder property(String path, Object value) {
140 if (path.equals(TITAN_CASSANDRA_KEYSPACE))
141 throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
142 + " in initial configuration - this path is used"
143 + " to specify graph names");
144 graphConfiguration.put(path, value);
148 public TitanChampGraphImpl build() {
149 return new TitanChampGraphImpl(this);
154 protected TitanGraph getGraph() {
159 protected ChampSchemaEnforcer getSchemaEnforcer() {
160 return SCHEMA_ENFORCER;
163 public void executeStoreObjectIndex(ChampObjectIndex index) {
164 if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
166 final TitanGraph graph = getGraph();
167 final TitanManagement createIndexMgmt = graph.openManagement();
168 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
170 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
171 createIndexMgmt.rollback();
172 return; //Ignore, index already exists
175 createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
177 createIndexMgmt.commit();
180 awaitIndexCreation(index.getName());
184 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
185 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
187 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
188 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
190 if (index == null) return Optional.empty();
191 if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
193 return Optional.of(ChampObjectIndex.create()
195 .onType(ChampObject.ReservedTypes.ANY.toString())
196 .forField(index.getFieldKeys()[0].name())
201 public Stream<ChampObjectIndex> retrieveObjectIndices() {
202 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
204 final TitanManagement createIndexMgmt = getGraph().openManagement();
205 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
207 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
209 private ChampObjectIndex next;
212 public boolean hasNext() {
213 if (indices.hasNext()) {
214 final TitanGraphIndex index = indices.next();
216 next = ChampObjectIndex.create()
217 .ofName(index.name())
218 .onType(ChampObject.ReservedTypes.ANY.toString())
219 .forField(index.getFieldKeys()[0].name())
229 public ChampObjectIndex next() {
230 if (next == null) throw new NoSuchElementException();
236 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
237 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
240 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
241 if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
243 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
246 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
247 if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
249 final TitanGraph graph = getGraph();
250 final TitanManagement createIndexMgmt = graph.openManagement();
251 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
253 if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
254 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
256 createIndexMgmt.commit();
259 awaitIndexCreation(index.getName());
263 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
264 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
266 final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
267 final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
269 if (index == null) return Optional.empty();
270 if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
272 return Optional.of(ChampRelationshipIndex.create()
274 .onType(ChampObject.ReservedTypes.ANY.toString())
275 .forField(index.getFieldKeys()[0].name())
280 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
281 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
283 final TitanManagement createIndexMgmt = getGraph().openManagement();
284 final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
286 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
288 private ChampRelationshipIndex next;
291 public boolean hasNext() {
292 if (indices.hasNext()) {
293 final TitanGraphIndex index = indices.next();
295 next = ChampRelationshipIndex.create()
296 .ofName(index.name())
297 .onType(ChampRelationship.ReservedTypes.ANY.toString())
298 .forField(index.getFieldKeys()[0].name())
308 public ChampRelationshipIndex next() {
309 if (next == null) throw new NoSuchElementException();
315 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
316 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
319 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
320 if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
322 throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
325 private Cardinality getTitanCardinality(ChampCardinality cardinality) {
326 switch (cardinality) {
328 return Cardinality.LIST;
330 return Cardinality.SET;
332 return Cardinality.SINGLE;
334 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
338 private void awaitIndexCreation(String indexName) {
339 //Wait for the index to become available
341 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
342 .status(SchemaStatus.ENABLED)
343 .timeout(1, ChronoUnit.SECONDS)
346 return; //Empty graphs immediately ENABLE indices
349 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
350 .status(SchemaStatus.REGISTERED)
351 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
354 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
357 } catch (InterruptedException e) {
358 LOGGER.warn("Interrupted while waiting for object index creation status");
362 //Reindex the existing data
365 final TitanManagement updateIndexMgmt = graph.openManagement();
366 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
367 updateIndexMgmt.commit();
368 } catch (InterruptedException e) {
369 LOGGER.warn("Interrupted while reindexing for object index");
371 } catch (ExecutionException e) {
372 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
376 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
377 .status(SchemaStatus.ENABLED)
378 .timeout(10, ChronoUnit.MINUTES)
380 } catch (InterruptedException e) {
381 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
387 public ChampCapabilities capabilities() {
392 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
393 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
395 final ChampSchema currentSchema = retrieveSchema();
396 final TitanManagement mgmt = getGraph().openManagement();
399 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
400 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
401 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
403 if (currentObjConstraint.isPresent()) {
404 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
406 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
407 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
411 final String newPropertyKeyName = propConstraint.getField().getName();
413 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
415 mgmt.makePropertyKey(newPropertyKeyName)
416 .dataType(propConstraint.getField().getJavaType())
417 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
422 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
424 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
426 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
428 if (currentRelConstraint.isPresent()) {
429 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
431 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
432 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
436 final String newPropertyKeyName = propConstraint.getField().getName();
438 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
440 mgmt.makePropertyKey(newPropertyKeyName)
441 .dataType(propConstraint.getField().getJavaType())
442 .cardinality(getTitanCardinality(propConstraint.getCardinality()))
446 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
448 if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
455 super.storeSchema(schema);
456 } catch (SchemaViolationException | ChampSchemaViolationException e) {
458 throw new ChampSchemaViolationException(e);