2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.onap.aai.champjanus.graph.impl;
24 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
25 import org.apache.tinkerpop.gremlin.structure.Edge;
26 import org.apache.tinkerpop.gremlin.structure.Vertex;
27 import org.janusgraph.core.*;
28 import org.onap.aai.champcore.Formatter;
29 import org.janusgraph.core.schema.JanusGraphIndex;
30 import org.janusgraph.core.schema.JanusGraphManagement;
31 import org.janusgraph.core.schema.SchemaAction;
32 import org.janusgraph.core.schema.SchemaStatus;
33 import org.janusgraph.graphdb.database.management.ManagementSystem;
34 import org.onap.aai.champcore.ChampCapabilities;
35 import org.onap.aai.champcore.FormatMapper;
36 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
37 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
38 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
39 import org.onap.aai.champcore.model.*;
40 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
41 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import java.time.temporal.ChronoUnit;
47 import java.util.concurrent.ExecutionException;
48 import java.util.stream.Stream;
49 import java.util.stream.StreamSupport;
51 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
52 private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
53 private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
54 private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
55 private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
56 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
57 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
59 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
62 public boolean canDeleteObjectIndices() {
67 public boolean canDeleteRelationshipIndices() {
72 private final JanusGraph graph;
74 public JanusChampGraphImpl(Builder builder) {
75 super(builder.graphConfiguration);
76 final JanusGraphFactory.Builder janusGraphBuilder = JanusGraphFactory.build();
78 for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
79 janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
82 janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new Random().nextInt(Short.MAX_VALUE)+""));
84 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
86 if ("cassandra".equals(storageBackend) ||
87 "cassandrathrift".equals(storageBackend) ||
88 "astyanax".equals(storageBackend) ||
89 "embeddedcassandra".equals(storageBackend)) {
91 janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
92 } else if ("hbase".equals(storageBackend)) {
93 janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
94 } else if ("berkleyje".equals(storageBackend)) {
95 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
96 } else if ("inmemory".equals(storageBackend)) {
98 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
101 LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
102 this.graph = janusGraphBuilder.open();
105 public static class Builder {
106 private final String graphName;
108 private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
110 public Builder(String graphName) {
111 this.graphName = graphName;
114 public Builder(String graphName, Map<String, Object> properties) {
115 this.graphName = graphName;
116 properties(properties);
119 public Builder properties(Map<String, Object> properties) {
120 if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
121 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
122 + " in initial configuration - this path is used"
123 + " to specify graph names");
126 this.graphConfiguration.putAll(properties);
130 public Builder property(String path, Object value) {
131 if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
132 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
133 + " in initial configuration - this path is used"
134 + " to specify graph names");
136 graphConfiguration.put(path, value);
140 public JanusChampGraphImpl build() {
141 return new JanusChampGraphImpl(this);
146 protected JanusGraph getGraph() {
152 protected ChampSchemaEnforcer getSchemaEnforcer() {
153 return SCHEMA_ENFORCER;
157 public void executeStoreObjectIndex(ChampObjectIndex index) {
159 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
162 final JanusGraph graph = getGraph();
163 final JanusGraphManagement createIndexMgmt = graph.openManagement();
164 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
166 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
167 createIndexMgmt.rollback();
168 return; //Ignore, index already exists
171 createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
173 createIndexMgmt.commit();
176 awaitIndexCreation(index.getName());
180 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
182 throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
185 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
186 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
189 return Optional.empty();
191 if (index.getIndexedElement() != JanusGraphVertex.class) {
192 return Optional.empty();
195 return Optional.of(ChampObjectIndex.create()
197 .onType(ChampObject.ReservedTypes.ANY.toString())
198 .forField(index.getFieldKeys()[0].name())
203 public Stream<ChampObjectIndex> retrieveObjectIndices() {
205 throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
208 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
209 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
211 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
213 private ChampObjectIndex next;
216 public boolean hasNext() {
217 if (indices.hasNext()) {
218 final JanusGraphIndex index = indices.next();
220 next = ChampObjectIndex.create()
221 .ofName(index.name())
222 .onType(ChampObject.ReservedTypes.ANY.toString())
223 .forField(index.getFieldKeys()[0].name())
233 public ChampObjectIndex next() {
235 throw new NoSuchElementException();
242 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
243 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
247 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
249 throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
252 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
256 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
258 throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
261 final JanusGraph graph = getGraph();
262 final JanusGraphManagement createIndexMgmt = graph.openManagement();
263 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
265 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
266 return; //Ignore, index already exists
268 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
270 createIndexMgmt.commit();
273 awaitIndexCreation(index.getName());
277 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
279 throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
282 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
283 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
286 return Optional.empty();
288 if (index.getIndexedElement() != JanusGraphEdge.class) {
289 return Optional.empty();
292 return Optional.of(ChampRelationshipIndex.create()
294 .onType(ChampObject.ReservedTypes.ANY.toString())
295 .forField(index.getFieldKeys()[0].name())
300 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
302 throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
305 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
306 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
308 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
310 private ChampRelationshipIndex next;
313 public boolean hasNext() {
314 if (indices.hasNext()) {
315 final JanusGraphIndex index = indices.next();
317 next = ChampRelationshipIndex.create()
318 .ofName(index.name())
319 .onType(ChampRelationship.ReservedTypes.ANY.toString())
320 .forField(index.getFieldKeys()[0].name())
330 public ChampRelationshipIndex next() {
332 throw new NoSuchElementException();
339 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
340 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
344 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
346 throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
349 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
352 private Cardinality getJanusCardinality(ChampCardinality cardinality) {
353 switch (cardinality) {
355 return Cardinality.LIST;
357 return Cardinality.SET;
359 return Cardinality.SINGLE;
361 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
365 private void awaitIndexCreation(String indexName) {
366 //Wait for the index to become available
368 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
369 .status(SchemaStatus.ENABLED)
370 .timeout(1, ChronoUnit.SECONDS)
373 return; //Empty graphs immediately ENABLE indices
376 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
377 .status(SchemaStatus.REGISTERED)
378 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
381 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
384 } catch (InterruptedException e) {
385 LOGGER.warn("Interrupted while waiting for object index creation status");
389 //Reindex the existing data
392 final JanusGraphManagement updateIndexMgmt = graph.openManagement();
393 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
394 updateIndexMgmt.commit();
395 } catch (InterruptedException e) {
396 LOGGER.warn("Interrupted while reindexing for object index");
398 } catch (ExecutionException e) {
399 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
403 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
404 .status(SchemaStatus.ENABLED)
405 .timeout(10, ChronoUnit.MINUTES)
407 } catch (InterruptedException e) {
408 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
414 public ChampCapabilities capabilities() {
418 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
419 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
421 final ChampSchema currentSchema = retrieveSchema();
422 final JanusGraphManagement mgmt = getGraph().openManagement();
425 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
426 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
427 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
429 if (currentObjConstraint.isPresent()) {
430 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
432 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
433 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
437 final String newPropertyKeyName = propConstraint.getField().getName();
439 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
441 mgmt.makePropertyKey(newPropertyKeyName)
442 .dataType(propConstraint.getField().getJavaType())
443 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
448 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
450 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
452 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
454 if (currentRelConstraint.isPresent()) {
455 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
457 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
458 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
462 final String newPropertyKeyName = propConstraint.getField().getName();
464 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
466 mgmt.makePropertyKey(newPropertyKeyName)
467 .dataType(propConstraint.getField().getJavaType())
468 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
472 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
474 if (edgeLabel != null) {
475 mgmt.makeEdgeLabel(relConstraint.getType())
483 super.storeSchema(schema);
484 } catch (SchemaViolationException | ChampSchemaViolationException e) {
486 throw new ChampSchemaViolationException(e);
490 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
491 return query.hasLabel(type);