2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champjanus.graph.impl;
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
30 import java.util.NoSuchElementException;
31 import java.util.Optional;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.concurrent.ExecutionException;
35 import java.util.stream.Stream;
36 import java.util.stream.StreamSupport;
38 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
39 import org.apache.tinkerpop.gremlin.structure.Edge;
40 import org.apache.tinkerpop.gremlin.structure.Vertex;
41 import org.janusgraph.core.Cardinality;
42 import org.janusgraph.core.EdgeLabel;
43 import org.janusgraph.core.JanusGraph;
44 import org.janusgraph.core.JanusGraphEdge;
45 import org.janusgraph.core.JanusGraphFactory;
46 import org.janusgraph.core.JanusGraphVertex;
47 import org.janusgraph.core.PropertyKey;
48 import org.janusgraph.core.SchemaViolationException;
49 import org.janusgraph.core.schema.JanusGraphIndex;
50 import org.janusgraph.core.schema.JanusGraphManagement;
51 import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
52 import org.janusgraph.core.schema.SchemaAction;
53 import org.janusgraph.core.schema.SchemaStatus;
54 import org.janusgraph.graphdb.database.management.ManagementSystem;
55 import org.onap.aai.champcore.ChampCapabilities;
56 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
57 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
58 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
59 import org.onap.aai.champcore.model.ChampCardinality;
60 import org.onap.aai.champcore.model.ChampField;
61 import org.onap.aai.champcore.model.ChampObject;
62 import org.onap.aai.champcore.model.ChampObjectConstraint;
63 import org.onap.aai.champcore.model.ChampObjectIndex;
64 import org.onap.aai.champcore.model.ChampPropertyConstraint;
65 import org.onap.aai.champcore.model.ChampRelationship;
66 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
67 import org.onap.aai.champcore.model.ChampRelationshipIndex;
68 import org.onap.aai.champcore.model.ChampSchema;
69 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
70 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
71 import org.onap.aai.cl.api.Logger;
72 import org.onap.aai.cl.eelf.LoggerFactory;
74 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
75 private static final Logger LOGGER = LoggerFactory.getInstance().getLogger(JanusChampGraphImpl.class);
76 private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
77 private static final String JANUS_CQL_KEYSPACE = "storage.cql.keyspace";
78 private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
79 private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
80 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
81 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 45;
83 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
86 public boolean canDeleteObjectIndices() {
91 public boolean canDeleteRelationshipIndices() {
96 private JanusGraph graph;
97 private final JanusGraphFactory.Builder janusGraphBuilder;
99 public JanusChampGraphImpl(Builder builder) {
100 super(builder.graphConfiguration);
101 janusGraphBuilder = JanusGraphFactory.build();
103 for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
104 janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
107 janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
109 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
111 if ("cassandra".equals(storageBackend) ||
112 "cassandrathrift".equals(storageBackend) ||
113 "astyanax".equals(storageBackend) ||
114 "embeddedcassandra".equals(storageBackend)) {
116 janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
117 } else if("cql".equals(storageBackend)){
118 janusGraphBuilder.set(JANUS_CQL_KEYSPACE, builder.graphName);
119 } else if ("hbase".equals(storageBackend)) {
120 janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
121 } else if ("berkleyje".equals(storageBackend)) {
122 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
123 } else if ("inmemory".equals(storageBackend)) {
125 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
131 catch (Exception ex) {
132 // Swallow exception. Cassandra may not be reachable. Will retry next time we need to use the graph.
133 LOGGER.error(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_ERROR,
134 "Error opening graph: " + ex.getMessage());
138 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
139 "Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
142 public static class Builder {
143 private final String graphName;
145 private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
147 public Builder(String graphName) {
148 this.graphName = graphName;
151 public Builder(String graphName, Map<String, Object> properties) {
152 this.graphName = graphName;
153 properties(properties);
156 public Builder properties(Map<String, Object> properties) {
157 if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
158 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
159 + " in initial configuration - this path is used"
160 + " to specify graph names");
163 this.graphConfiguration.putAll(properties);
167 public Builder property(String path, Object value) {
168 if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
169 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
170 + " in initial configuration - this path is used"
171 + " to specify graph names");
173 graphConfiguration.put(path, value);
177 public JanusChampGraphImpl build() {
178 return new JanusChampGraphImpl(this);
183 protected JanusGraph getGraph() {
192 protected ChampSchemaEnforcer getSchemaEnforcer() {
193 return SCHEMA_ENFORCER;
197 public void executeStoreObjectIndex(ChampObjectIndex index) {
199 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
202 final JanusGraph graph = getGraph();
203 final JanusGraphManagement createIndexMgmt = graph.openManagement();
205 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
206 createIndexMgmt.rollback();
207 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
208 "Index " + index.getName() + " already exists");
209 return; //Ignore, index already exists
212 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
213 "Create index " + index.getName());
214 IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
215 for (ChampField field : index.getFields()) {
216 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
219 ib.buildCompositeIndex();
221 createIndexMgmt.commit();
224 awaitIndexCreation(index.getName());
228 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
230 throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
233 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
234 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
237 return Optional.empty();
239 if (index.getIndexedElement() != JanusGraphVertex.class) {
240 return Optional.empty();
243 List<String> fieldNames = new ArrayList<String>();
244 for (int i = 0; i < index.getFieldKeys().length; i++) {
245 fieldNames.add(index.getFieldKeys()[i].name());
248 return Optional.of(ChampObjectIndex.create()
250 .onType(ChampObject.ReservedTypes.ANY.toString())
251 .forFields(fieldNames)
256 public Stream<ChampObjectIndex> retrieveObjectIndices() {
258 throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
261 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
262 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
264 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
266 private ChampObjectIndex next;
269 public boolean hasNext() {
270 if (indices.hasNext()) {
271 final JanusGraphIndex index = indices.next();
273 List<String> fieldNames = new ArrayList<String>();
274 for (int i = 0; i < index.getFieldKeys().length; i++) {
275 fieldNames.add(index.getFieldKeys()[i].name());
278 next = ChampObjectIndex.create()
279 .ofName(index.name())
280 .onType(ChampObject.ReservedTypes.ANY.toString())
281 .forFields(fieldNames)
291 public ChampObjectIndex next() {
293 throw new NoSuchElementException();
300 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
301 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
305 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
307 throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
310 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
314 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
316 throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
319 final JanusGraph graph = getGraph();
320 final JanusGraphManagement createIndexMgmt = graph.openManagement();
321 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
323 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
324 return; //Ignore, index already exists
327 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
328 "Create edge index " + index.getName());
329 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
331 createIndexMgmt.commit();
334 awaitIndexCreation(index.getName());
338 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
340 throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
343 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
344 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
347 return Optional.empty();
349 if (index.getIndexedElement() != JanusGraphEdge.class) {
350 return Optional.empty();
353 return Optional.of(ChampRelationshipIndex.create()
355 .onType(ChampObject.ReservedTypes.ANY.toString())
356 .forField(index.getFieldKeys()[0].name())
361 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
363 throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
366 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
367 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
369 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
371 private ChampRelationshipIndex next;
374 public boolean hasNext() {
375 if (indices.hasNext()) {
376 final JanusGraphIndex index = indices.next();
378 next = ChampRelationshipIndex.create()
379 .ofName(index.name())
380 .onType(ChampRelationship.ReservedTypes.ANY.toString())
381 .forField(index.getFieldKeys()[0].name())
391 public ChampRelationshipIndex next() {
393 throw new NoSuchElementException();
400 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
401 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
405 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
407 throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
410 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
413 private Cardinality getJanusCardinality(ChampCardinality cardinality) {
414 switch (cardinality) {
416 return Cardinality.LIST;
418 return Cardinality.SET;
420 return Cardinality.SINGLE;
422 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
426 private void awaitIndexCreation(String indexName) {
427 //Wait for the index to become available
429 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
430 .status(SchemaStatus.ENABLED)
431 .timeout(1, ChronoUnit.SECONDS)
434 return; //Empty graphs immediately ENABLE indices
437 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
438 .status(SchemaStatus.REGISTERED)
439 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
442 LOGGER.warn(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_WARN,
443 "Object index was created, but timed out while waiting for it to be registered");
446 } catch (InterruptedException e) {
447 LOGGER.warn(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_WARN,
448 "Interrupted while waiting for object index creation status");
449 Thread.currentThread().interrupt();
453 //Reindex the existing data
456 final JanusGraphManagement updateIndexMgmt = graph.openManagement();
457 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
458 updateIndexMgmt.commit();
459 } catch (InterruptedException e) {
460 LOGGER.warn(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_WARN,
461 "Interrupted while reindexing for object index");
462 Thread.currentThread().interrupt();
464 } catch (ExecutionException e) {
465 LOGGER.warn(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_WARN,
466 "Exception occurred during reindexing procedure for creating object index " + indexName + ". " + e.getMessage());
470 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
471 .status(SchemaStatus.ENABLED)
472 .timeout(2, ChronoUnit.MINUTES)
474 } catch (InterruptedException e) {
475 LOGGER.warn(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_WARN,
476 "Interrupted while waiting for index to transition to ENABLED state");
477 Thread.currentThread().interrupt();
483 public ChampCapabilities capabilities() {
487 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
488 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
490 final ChampSchema currentSchema = retrieveSchema();
491 final JanusGraphManagement mgmt = getGraph().openManagement();
494 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
495 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
496 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
498 if (currentObjConstraint.isPresent()) {
499 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
501 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
502 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
506 final String newPropertyKeyName = propConstraint.getField().getName();
508 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
510 mgmt.makePropertyKey(newPropertyKeyName)
511 .dataType(propConstraint.getField().getJavaType())
512 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
517 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
519 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
521 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
523 if (currentRelConstraint.isPresent()) {
524 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
526 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
527 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
531 final String newPropertyKeyName = propConstraint.getField().getName();
533 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
535 mgmt.makePropertyKey(newPropertyKeyName)
536 .dataType(propConstraint.getField().getJavaType())
537 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
541 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
543 if (edgeLabel != null) {
544 mgmt.makeEdgeLabel(relConstraint.getType())
552 super.storeSchema(schema);
553 } catch (SchemaViolationException | ChampSchemaViolationException e) {
555 throw new ChampSchemaViolationException(e);
559 private synchronized void openGraph() {
561 graph = janusGraphBuilder.open();
565 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
566 return query.hasLabel(type);
571 public void createDefaultIndexes() {
573 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
576 final String EDGE_IX_NAME = "rel-key-uuid";
578 final JanusGraph graph = getGraph();
579 JanusGraphManagement createIndexMgmt = graph.openManagement();
581 boolean vertexIndexExists = (createIndexMgmt.getGraphIndex(KEY_PROPERTY_NAME) != null);
582 boolean edgeIndexExists = (createIndexMgmt.getGraphIndex(EDGE_IX_NAME) != null);
583 boolean nodeTypeIndexExists = (createIndexMgmt.getGraphIndex(NODE_TYPE_PROPERTY_NAME) != null);
585 if (!vertexIndexExists || !edgeIndexExists) {
586 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(KEY_PROPERTY_NAME);
588 if (!vertexIndexExists) {
589 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
590 "Create Index " + KEY_PROPERTY_NAME);
591 createIndexMgmt.buildIndex(KEY_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
593 if (!edgeIndexExists) {
594 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
595 "Create Index " + EDGE_IX_NAME);
596 createIndexMgmt.buildIndex(EDGE_IX_NAME, Edge.class).addKey(pk).buildCompositeIndex();
598 createIndexMgmt.commit();
600 if (!vertexIndexExists) {
601 awaitIndexCreation(KEY_PROPERTY_NAME);
603 if (!edgeIndexExists) {
604 awaitIndexCreation(EDGE_IX_NAME);
608 createIndexMgmt.rollback();
609 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
610 "Index " + KEY_PROPERTY_NAME + " and " + EDGE_IX_NAME + " already exist");
615 if (!nodeTypeIndexExists) {
616 LOGGER.info(ChampJanusMsgs.JANUS_CHAMP_GRAPH_IMPL_INFO,
617 "Create Index " + NODE_TYPE_PROPERTY_NAME);
618 createIndexMgmt = graph.openManagement();
619 PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(NODE_TYPE_PROPERTY_NAME);
620 createIndexMgmt.buildIndex(NODE_TYPE_PROPERTY_NAME, Vertex.class).addKey(pk).buildCompositeIndex();
621 createIndexMgmt.commit();
622 awaitIndexCreation(NODE_TYPE_PROPERTY_NAME);