2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champjanus.graph.impl;
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.janusgraph.core.schema.JanusGraphIndex;
28 import org.janusgraph.core.schema.JanusGraphManagement;
29 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
35 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
36 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
37 import org.onap.aai.champcore.model.*;
38 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
39 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import java.security.SecureRandom;
44 import java.time.temporal.ChronoUnit;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51 private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52 private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53 private static final String JANUS_CQL_KEYSPACE = "storage.cql.keyspace";
54 private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
55 private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
56 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
57 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 45;
59 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
62 public boolean canDeleteObjectIndices() {
67 public boolean canDeleteRelationshipIndices() {
72 private JanusGraph graph;
73 private final JanusGraphFactory.Builder janusGraphBuilder;
75 public JanusChampGraphImpl(Builder builder) {
76 super(builder.graphConfiguration);
77 janusGraphBuilder = JanusGraphFactory.build();
79 for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
80 janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
83 janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
85 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
87 if ("cassandra".equals(storageBackend) ||
88 "cassandrathrift".equals(storageBackend) ||
89 "astyanax".equals(storageBackend) ||
90 "embeddedcassandra".equals(storageBackend)) {
92 janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
93 } else if("cql".equals(storageBackend)){
94 janusGraphBuilder.set(JANUS_CQL_KEYSPACE, builder.graphName);
95 } else if ("hbase".equals(storageBackend)) {
96 janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
97 } else if ("berkleyje".equals(storageBackend)) {
98 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
99 } else if ("inmemory".equals(storageBackend)) {
101 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
107 catch (Exception ex) {
108 // Swallow exception. Cassandra may not be reachable. Will retry next time we need to use the graph.
109 LOGGER.error("Error opening graph: " + ex.getMessage());
113 LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
116 public static class Builder {
117 private final String graphName;
119 private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
121 public Builder(String graphName) {
122 this.graphName = graphName;
125 public Builder(String graphName, Map<String, Object> properties) {
126 this.graphName = graphName;
127 properties(properties);
130 public Builder properties(Map<String, Object> properties) {
131 if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
132 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
133 + " in initial configuration - this path is used"
134 + " to specify graph names");
137 this.graphConfiguration.putAll(properties);
141 public Builder property(String path, Object value) {
142 if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
143 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
144 + " in initial configuration - this path is used"
145 + " to specify graph names");
147 graphConfiguration.put(path, value);
151 public JanusChampGraphImpl build() {
152 return new JanusChampGraphImpl(this);
157 protected JanusGraph getGraph() {
166 protected ChampSchemaEnforcer getSchemaEnforcer() {
167 return SCHEMA_ENFORCER;
171 public void executeStoreObjectIndex(ChampObjectIndex index) {
173 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
176 final JanusGraph graph = getGraph();
177 final JanusGraphManagement createIndexMgmt = graph.openManagement();
179 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
180 createIndexMgmt.rollback();
181 LOGGER.info("Index " + index.getName() + " already exists");
182 return; //Ignore, index already exists
185 LOGGER.info("Create index " + index.getName());
186 IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
187 for (ChampField field : index.getFields()) {
188 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
191 ib.buildCompositeIndex();
193 createIndexMgmt.commit();
196 awaitIndexCreation(index.getName());
200 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
202 throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
205 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
206 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
209 return Optional.empty();
211 if (index.getIndexedElement() != JanusGraphVertex.class) {
212 return Optional.empty();
215 List<String> fieldNames = new ArrayList<String>();
216 for (int i = 0; i < index.getFieldKeys().length; i++) {
217 fieldNames.add(index.getFieldKeys()[i].name());
220 return Optional.of(ChampObjectIndex.create()
222 .onType(ChampObject.ReservedTypes.ANY.toString())
223 .forFields(fieldNames)
228 public Stream<ChampObjectIndex> retrieveObjectIndices() {
230 throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
233 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
234 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
236 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
238 private ChampObjectIndex next;
241 public boolean hasNext() {
242 if (indices.hasNext()) {
243 final JanusGraphIndex index = indices.next();
245 List<String> fieldNames = new ArrayList<String>();
246 for (int i = 0; i < index.getFieldKeys().length; i++) {
247 fieldNames.add(index.getFieldKeys()[i].name());
250 next = ChampObjectIndex.create()
251 .ofName(index.name())
252 .onType(ChampObject.ReservedTypes.ANY.toString())
253 .forFields(fieldNames)
263 public ChampObjectIndex next() {
265 throw new NoSuchElementException();
272 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
273 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
277 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
279 throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
282 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
286 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
288 throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
291 final JanusGraph graph = getGraph();
292 final JanusGraphManagement createIndexMgmt = graph.openManagement();
293 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
295 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
296 return; //Ignore, index already exists
299 LOGGER.info("Create edge index " + index.getName());
300 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
302 createIndexMgmt.commit();
305 awaitIndexCreation(index.getName());
309 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
311 throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
314 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
315 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
318 return Optional.empty();
320 if (index.getIndexedElement() != JanusGraphEdge.class) {
321 return Optional.empty();
324 return Optional.of(ChampRelationshipIndex.create()
326 .onType(ChampObject.ReservedTypes.ANY.toString())
327 .forField(index.getFieldKeys()[0].name())
332 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
334 throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
337 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
338 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
340 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
342 private ChampRelationshipIndex next;
345 public boolean hasNext() {
346 if (indices.hasNext()) {
347 final JanusGraphIndex index = indices.next();
349 next = ChampRelationshipIndex.create()
350 .ofName(index.name())
351 .onType(ChampRelationship.ReservedTypes.ANY.toString())
352 .forField(index.getFieldKeys()[0].name())
362 public ChampRelationshipIndex next() {
364 throw new NoSuchElementException();
371 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
372 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
376 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
378 throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
381 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
384 private Cardinality getJanusCardinality(ChampCardinality cardinality) {
385 switch (cardinality) {
387 return Cardinality.LIST;
389 return Cardinality.SET;
391 return Cardinality.SINGLE;
393 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
397 private void awaitIndexCreation(String indexName) {
398 //Wait for the index to become available
400 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
401 .status(SchemaStatus.ENABLED)
402 .timeout(1, ChronoUnit.SECONDS)
405 return; //Empty graphs immediately ENABLE indices
408 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
409 .status(SchemaStatus.REGISTERED)
410 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
413 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
416 } catch (InterruptedException e) {
417 LOGGER.warn("Interrupted while waiting for object index creation status");
418 Thread.currentThread().interrupt();
422 //Reindex the existing data
425 final JanusGraphManagement updateIndexMgmt = graph.openManagement();
426 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
427 updateIndexMgmt.commit();
428 } catch (InterruptedException e) {
429 LOGGER.warn("Interrupted while reindexing for object index");
430 Thread.currentThread().interrupt();
432 } catch (ExecutionException e) {
433 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
437 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
438 .status(SchemaStatus.ENABLED)
439 .timeout(2, ChronoUnit.MINUTES)
441 } catch (InterruptedException e) {
442 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
443 Thread.currentThread().interrupt();
449 public ChampCapabilities capabilities() {
453 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
454 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
456 final ChampSchema currentSchema = retrieveSchema();
457 final JanusGraphManagement mgmt = getGraph().openManagement();
460 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
461 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
462 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
464 if (currentObjConstraint.isPresent()) {
465 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
467 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
468 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
472 final String newPropertyKeyName = propConstraint.getField().getName();
474 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
476 mgmt.makePropertyKey(newPropertyKeyName)
477 .dataType(propConstraint.getField().getJavaType())
478 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
483 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
485 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
487 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
489 if (currentRelConstraint.isPresent()) {
490 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
492 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
493 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
497 final String newPropertyKeyName = propConstraint.getField().getName();
499 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
501 mgmt.makePropertyKey(newPropertyKeyName)
502 .dataType(propConstraint.getField().getJavaType())
503 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
507 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
509 if (edgeLabel != null) {
510 mgmt.makeEdgeLabel(relConstraint.getType())
518 super.storeSchema(schema);
519 } catch (SchemaViolationException | ChampSchemaViolationException e) {
521 throw new ChampSchemaViolationException(e);
525 private synchronized void openGraph() {
527 graph = janusGraphBuilder.open();
531 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
532 return query.hasLabel(type);
537 public void createDefaultIndexes() {
539 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
542 final String EDGE_IX_NAME = "rel-key-uuid";
544 final JanusGraph graph = getGraph();
545 JanusGraphManagement createIndexMgmt = graph.openManagement();
547 boolean vertexIndexExists = (createIndexMgmt.getGraphIndex(KEY_PROPERTY_NAME) != null);
548 boolean edgeIndexExists = (createIndexMgmt.getGraphIndex(EDGE_IX_NAME) != null);
549 boolean nodeTypeIndexExists = (createIndexMgmt.getGraphIndex(NODE_TYPE_PROPERTY_NAME) != null);
551 if (!vertexIndexExists || !edgeIndexExists) {
552 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(KEY_PROPERTY_NAME);
554 if (!vertexIndexExists) {
555 LOGGER.info("Create Index " + KEY_PROPERTY_NAME);
556 createIndexMgmt.buildIndex(KEY_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
558 if (!edgeIndexExists) {
559 LOGGER.info("Create Index " + EDGE_IX_NAME);
560 createIndexMgmt.buildIndex(EDGE_IX_NAME, Edge.class).addKey(pk).buildCompositeIndex();
562 createIndexMgmt.commit();
564 if (!vertexIndexExists) {
565 awaitIndexCreation(KEY_PROPERTY_NAME);
567 if (!edgeIndexExists) {
568 awaitIndexCreation(EDGE_IX_NAME);
572 createIndexMgmt.rollback();
573 LOGGER.info("Index " + KEY_PROPERTY_NAME + " and " + EDGE_IX_NAME + " already exist");
578 if (!nodeTypeIndexExists) {
579 LOGGER.info("Create Index " + NODE_TYPE_PROPERTY_NAME);
580 createIndexMgmt = graph.openManagement();
581 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(NODE_TYPE_PROPERTY_NAME);
582 createIndexMgmt.buildIndex(NODE_TYPE_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
583 createIndexMgmt.commit();
584 awaitIndexCreation(NODE_TYPE_PROPERTY_NAME);