2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champjanus.graph.impl;
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.janusgraph.core.schema.JanusGraphIndex;
28 import org.janusgraph.core.schema.JanusGraphManagement;
29 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
35 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
36 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
37 import org.onap.aai.champcore.model.*;
38 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
39 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import java.security.SecureRandom;
44 import java.time.temporal.ChronoUnit;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51 private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52 private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53 private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
54 private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
55 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
56 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 45;
58 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
61 public boolean canDeleteObjectIndices() {
66 public boolean canDeleteRelationshipIndices() {
71 private JanusGraph graph;
72 private final JanusGraphFactory.Builder janusGraphBuilder;
74 public JanusChampGraphImpl(Builder builder) {
75 super(builder.graphConfiguration);
76 janusGraphBuilder = JanusGraphFactory.build();
78 for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
79 janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
82 janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
84 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
86 if ("cassandra".equals(storageBackend) ||
87 "cassandrathrift".equals(storageBackend) ||
88 "astyanax".equals(storageBackend) ||
89 "embeddedcassandra".equals(storageBackend)) {
91 janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
92 } else if ("hbase".equals(storageBackend)) {
93 janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
94 } else if ("berkleyje".equals(storageBackend)) {
95 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
96 } else if ("inmemory".equals(storageBackend)) {
98 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
104 catch (Exception ex) {
105 // Swallow exception. Cassandra may not be reachable. Will retry next time we need to use the graph.
106 LOGGER.error("Error opening graph: " + ex.getMessage());
110 LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
113 public static class Builder {
114 private final String graphName;
116 private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
118 public Builder(String graphName) {
119 this.graphName = graphName;
122 public Builder(String graphName, Map<String, Object> properties) {
123 this.graphName = graphName;
124 properties(properties);
127 public Builder properties(Map<String, Object> properties) {
128 if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
129 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
130 + " in initial configuration - this path is used"
131 + " to specify graph names");
134 this.graphConfiguration.putAll(properties);
138 public Builder property(String path, Object value) {
139 if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
140 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
141 + " in initial configuration - this path is used"
142 + " to specify graph names");
144 graphConfiguration.put(path, value);
148 public JanusChampGraphImpl build() {
149 return new JanusChampGraphImpl(this);
154 protected JanusGraph getGraph() {
163 protected ChampSchemaEnforcer getSchemaEnforcer() {
164 return SCHEMA_ENFORCER;
168 public void executeStoreObjectIndex(ChampObjectIndex index) {
170 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
173 final JanusGraph graph = getGraph();
174 final JanusGraphManagement createIndexMgmt = graph.openManagement();
176 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
177 createIndexMgmt.rollback();
178 LOGGER.info("Index " + index.getName() + " already exists");
179 return; //Ignore, index already exists
182 LOGGER.info("Create index " + index.getName());
183 IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
184 for (ChampField field : index.getFields()) {
185 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
188 ib.buildCompositeIndex();
190 createIndexMgmt.commit();
193 awaitIndexCreation(index.getName());
197 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
199 throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
202 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
203 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
206 return Optional.empty();
208 if (index.getIndexedElement() != JanusGraphVertex.class) {
209 return Optional.empty();
212 List<String> fieldNames = new ArrayList<String>();
213 for (int i = 0; i < index.getFieldKeys().length; i++) {
214 fieldNames.add(index.getFieldKeys()[i].name());
217 return Optional.of(ChampObjectIndex.create()
219 .onType(ChampObject.ReservedTypes.ANY.toString())
220 .forFields(fieldNames)
225 public Stream<ChampObjectIndex> retrieveObjectIndices() {
227 throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
230 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
231 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
233 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
235 private ChampObjectIndex next;
238 public boolean hasNext() {
239 if (indices.hasNext()) {
240 final JanusGraphIndex index = indices.next();
242 List<String> fieldNames = new ArrayList<String>();
243 for (int i = 0; i < index.getFieldKeys().length; i++) {
244 fieldNames.add(index.getFieldKeys()[i].name());
247 next = ChampObjectIndex.create()
248 .ofName(index.name())
249 .onType(ChampObject.ReservedTypes.ANY.toString())
250 .forFields(fieldNames)
260 public ChampObjectIndex next() {
262 throw new NoSuchElementException();
269 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
270 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
274 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
276 throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
279 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
283 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
285 throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
288 final JanusGraph graph = getGraph();
289 final JanusGraphManagement createIndexMgmt = graph.openManagement();
290 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
292 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
293 return; //Ignore, index already exists
296 LOGGER.info("Create edge index " + index.getName());
297 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
299 createIndexMgmt.commit();
302 awaitIndexCreation(index.getName());
306 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
308 throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
311 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
312 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
315 return Optional.empty();
317 if (index.getIndexedElement() != JanusGraphEdge.class) {
318 return Optional.empty();
321 return Optional.of(ChampRelationshipIndex.create()
323 .onType(ChampObject.ReservedTypes.ANY.toString())
324 .forField(index.getFieldKeys()[0].name())
329 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
331 throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
334 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
335 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
337 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
339 private ChampRelationshipIndex next;
342 public boolean hasNext() {
343 if (indices.hasNext()) {
344 final JanusGraphIndex index = indices.next();
346 next = ChampRelationshipIndex.create()
347 .ofName(index.name())
348 .onType(ChampRelationship.ReservedTypes.ANY.toString())
349 .forField(index.getFieldKeys()[0].name())
359 public ChampRelationshipIndex next() {
361 throw new NoSuchElementException();
368 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
369 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
373 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
375 throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
378 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
381 private Cardinality getJanusCardinality(ChampCardinality cardinality) {
382 switch (cardinality) {
384 return Cardinality.LIST;
386 return Cardinality.SET;
388 return Cardinality.SINGLE;
390 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
394 private void awaitIndexCreation(String indexName) {
395 //Wait for the index to become available
397 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
398 .status(SchemaStatus.ENABLED)
399 .timeout(1, ChronoUnit.SECONDS)
402 return; //Empty graphs immediately ENABLE indices
405 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
406 .status(SchemaStatus.REGISTERED)
407 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
410 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
413 } catch (InterruptedException e) {
414 LOGGER.warn("Interrupted while waiting for object index creation status");
415 Thread.currentThread().interrupt();
419 //Reindex the existing data
422 final JanusGraphManagement updateIndexMgmt = graph.openManagement();
423 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
424 updateIndexMgmt.commit();
425 } catch (InterruptedException e) {
426 LOGGER.warn("Interrupted while reindexing for object index");
427 Thread.currentThread().interrupt();
429 } catch (ExecutionException e) {
430 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
434 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
435 .status(SchemaStatus.ENABLED)
436 .timeout(2, ChronoUnit.MINUTES)
438 } catch (InterruptedException e) {
439 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
440 Thread.currentThread().interrupt();
446 public ChampCapabilities capabilities() {
450 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
451 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
453 final ChampSchema currentSchema = retrieveSchema();
454 final JanusGraphManagement mgmt = getGraph().openManagement();
457 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
458 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
459 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
461 if (currentObjConstraint.isPresent()) {
462 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
464 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
465 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
469 final String newPropertyKeyName = propConstraint.getField().getName();
471 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
473 mgmt.makePropertyKey(newPropertyKeyName)
474 .dataType(propConstraint.getField().getJavaType())
475 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
480 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
482 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
484 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
486 if (currentRelConstraint.isPresent()) {
487 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
489 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
490 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
494 final String newPropertyKeyName = propConstraint.getField().getName();
496 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
498 mgmt.makePropertyKey(newPropertyKeyName)
499 .dataType(propConstraint.getField().getJavaType())
500 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
504 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
506 if (edgeLabel != null) {
507 mgmt.makeEdgeLabel(relConstraint.getType())
515 super.storeSchema(schema);
516 } catch (SchemaViolationException | ChampSchemaViolationException e) {
518 throw new ChampSchemaViolationException(e);
522 private synchronized void openGraph() {
524 graph = janusGraphBuilder.open();
528 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
529 return query.hasLabel(type);
534 public void createDefaultIndexes() {
536 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
539 final String EDGE_IX_NAME = "rel-key-uuid";
541 final JanusGraph graph = getGraph();
542 JanusGraphManagement createIndexMgmt = graph.openManagement();
544 boolean vertexIndexExists = (createIndexMgmt.getGraphIndex(KEY_PROPERTY_NAME) != null);
545 boolean edgeIndexExists = (createIndexMgmt.getGraphIndex(EDGE_IX_NAME) != null);
546 boolean nodeTypeIndexExists = (createIndexMgmt.getGraphIndex(NODE_TYPE_PROPERTY_NAME) != null);
548 if (!vertexIndexExists || !edgeIndexExists) {
549 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(KEY_PROPERTY_NAME);
551 if (!vertexIndexExists) {
552 LOGGER.info("Create Index " + KEY_PROPERTY_NAME);
553 createIndexMgmt.buildIndex(KEY_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
555 if (!edgeIndexExists) {
556 LOGGER.info("Create Index " + EDGE_IX_NAME);
557 createIndexMgmt.buildIndex(EDGE_IX_NAME, Edge.class).addKey(pk).buildCompositeIndex();
559 createIndexMgmt.commit();
561 if (!vertexIndexExists) {
562 awaitIndexCreation(KEY_PROPERTY_NAME);
564 if (!edgeIndexExists) {
565 awaitIndexCreation(EDGE_IX_NAME);
569 createIndexMgmt.rollback();
570 LOGGER.info("Index " + KEY_PROPERTY_NAME + " and " + EDGE_IX_NAME + " already exist");
575 if (!nodeTypeIndexExists) {
576 LOGGER.info("Create Index " + NODE_TYPE_PROPERTY_NAME);
577 createIndexMgmt = graph.openManagement();
578 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(NODE_TYPE_PROPERTY_NAME);
579 createIndexMgmt.buildIndex(NODE_TYPE_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
580 createIndexMgmt.commit();
581 awaitIndexCreation(NODE_TYPE_PROPERTY_NAME);