2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champjanus.graph.impl;
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.onap.aai.champcore.Formatter;
28 import org.janusgraph.core.schema.JanusGraphIndex;
29 import org.janusgraph.core.schema.JanusGraphManagement;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.FormatMapper;
35 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
36 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
37 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
38 import org.onap.aai.champcore.model.*;
39 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
40 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import java.time.temporal.ChronoUnit;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51 private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52 private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53 private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
54 private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
55 private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
56 private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
58 private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
61 public boolean canDeleteObjectIndices() {
66 public boolean canDeleteRelationshipIndices() {
71 private final JanusGraph graph;
73 public JanusChampGraphImpl(Builder builder) {
74 super(builder.graphConfiguration);
75 final JanusGraphFactory.Builder janusGraphBuilder = JanusGraphFactory.build();
77 for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
78 janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
81 janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new Random().nextInt(Short.MAX_VALUE)+""));
83 final Object storageBackend = builder.graphConfiguration.get("storage.backend");
85 if ("cassandra".equals(storageBackend) ||
86 "cassandrathrift".equals(storageBackend) ||
87 "astyanax".equals(storageBackend) ||
88 "embeddedcassandra".equals(storageBackend)) {
90 janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
91 } else if ("hbase".equals(storageBackend)) {
92 janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
93 } else if ("berkleyje".equals(storageBackend)) {
94 throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
95 } else if ("inmemory".equals(storageBackend)) {
97 throw new RuntimeException("Unknown storage.backend=" + storageBackend);
100 LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
101 this.graph = janusGraphBuilder.open();
104 public static class Builder {
105 private final String graphName;
107 private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
109 public Builder(String graphName) {
110 this.graphName = graphName;
113 public Builder(String graphName, Map<String, Object> properties) {
114 this.graphName = graphName;
115 properties(properties);
118 public Builder properties(Map<String, Object> properties) {
119 if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
120 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
121 + " in initial configuration - this path is used"
122 + " to specify graph names");
125 this.graphConfiguration.putAll(properties);
129 public Builder property(String path, Object value) {
130 if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
131 throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
132 + " in initial configuration - this path is used"
133 + " to specify graph names");
135 graphConfiguration.put(path, value);
139 public JanusChampGraphImpl build() {
140 return new JanusChampGraphImpl(this);
145 protected JanusGraph getGraph() {
151 protected ChampSchemaEnforcer getSchemaEnforcer() {
152 return SCHEMA_ENFORCER;
156 public void executeStoreObjectIndex(ChampObjectIndex index) {
158 throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
161 final JanusGraph graph = getGraph();
162 final JanusGraphManagement createIndexMgmt = graph.openManagement();
163 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
165 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
166 createIndexMgmt.rollback();
167 return; //Ignore, index already exists
170 createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
172 createIndexMgmt.commit();
175 awaitIndexCreation(index.getName());
179 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
181 throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
184 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
185 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
188 return Optional.empty();
190 if (index.getIndexedElement() != JanusGraphVertex.class) {
191 return Optional.empty();
194 return Optional.of(ChampObjectIndex.create()
196 .onType(ChampObject.ReservedTypes.ANY.toString())
197 .forField(index.getFieldKeys()[0].name())
202 public Stream<ChampObjectIndex> retrieveObjectIndices() {
204 throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
207 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
208 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
210 final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
212 private ChampObjectIndex next;
215 public boolean hasNext() {
216 if (indices.hasNext()) {
217 final JanusGraphIndex index = indices.next();
219 next = ChampObjectIndex.create()
220 .ofName(index.name())
221 .onType(ChampObject.ReservedTypes.ANY.toString())
222 .forField(index.getFieldKeys()[0].name())
232 public ChampObjectIndex next() {
234 throw new NoSuchElementException();
241 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
242 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
246 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
248 throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
251 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
255 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
257 throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
260 final JanusGraph graph = getGraph();
261 final JanusGraphManagement createIndexMgmt = graph.openManagement();
262 final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
264 if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
265 return; //Ignore, index already exists
267 createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
269 createIndexMgmt.commit();
272 awaitIndexCreation(index.getName());
276 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
278 throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
281 final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
282 final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
285 return Optional.empty();
287 if (index.getIndexedElement() != JanusGraphEdge.class) {
288 return Optional.empty();
291 return Optional.of(ChampRelationshipIndex.create()
293 .onType(ChampObject.ReservedTypes.ANY.toString())
294 .forField(index.getFieldKeys()[0].name())
299 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
301 throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
304 final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
305 final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
307 final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
309 private ChampRelationshipIndex next;
312 public boolean hasNext() {
313 if (indices.hasNext()) {
314 final JanusGraphIndex index = indices.next();
316 next = ChampRelationshipIndex.create()
317 .ofName(index.name())
318 .onType(ChampRelationship.ReservedTypes.ANY.toString())
319 .forField(index.getFieldKeys()[0].name())
329 public ChampRelationshipIndex next() {
331 throw new NoSuchElementException();
338 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
339 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
343 public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
345 throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
348 throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
351 private Cardinality getJanusCardinality(ChampCardinality cardinality) {
352 switch (cardinality) {
354 return Cardinality.LIST;
356 return Cardinality.SET;
358 return Cardinality.SINGLE;
360 throw new RuntimeException("Unknown ChampCardinality " + cardinality);
364 private void awaitIndexCreation(String indexName) {
365 //Wait for the index to become available
367 if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
368 .status(SchemaStatus.ENABLED)
369 .timeout(1, ChronoUnit.SECONDS)
372 return; //Empty graphs immediately ENABLE indices
375 if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
376 .status(SchemaStatus.REGISTERED)
377 .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
380 LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
383 } catch (InterruptedException e) {
384 LOGGER.warn("Interrupted while waiting for object index creation status");
388 //Reindex the existing data
391 final JanusGraphManagement updateIndexMgmt = graph.openManagement();
392 updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
393 updateIndexMgmt.commit();
394 } catch (InterruptedException e) {
395 LOGGER.warn("Interrupted while reindexing for object index");
397 } catch (ExecutionException e) {
398 LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
402 ManagementSystem.awaitGraphIndexStatus(graph, indexName)
403 .status(SchemaStatus.ENABLED)
404 .timeout(10, ChronoUnit.MINUTES)
406 } catch (InterruptedException e) {
407 LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
413 public ChampCapabilities capabilities() {
417 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
418 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
420 final ChampSchema currentSchema = retrieveSchema();
421 final JanusGraphManagement mgmt = getGraph().openManagement();
424 for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
425 for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
426 final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
428 if (currentObjConstraint.isPresent()) {
429 final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
431 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
432 throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
436 final String newPropertyKeyName = propConstraint.getField().getName();
438 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
440 mgmt.makePropertyKey(newPropertyKeyName)
441 .dataType(propConstraint.getField().getJavaType())
442 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
447 for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
449 final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
451 for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
453 if (currentRelConstraint.isPresent()) {
454 final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
456 if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
457 throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
461 final String newPropertyKeyName = propConstraint.getField().getName();
463 if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
465 mgmt.makePropertyKey(newPropertyKeyName)
466 .dataType(propConstraint.getField().getJavaType())
467 .cardinality(getJanusCardinality(propConstraint.getCardinality()))
471 final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
473 if (edgeLabel != null) {
474 mgmt.makeEdgeLabel(relConstraint.getType())
482 super.storeSchema(schema);
483 } catch (SchemaViolationException | ChampSchemaViolationException e) {
485 throw new ChampSchemaViolationException(e);
489 public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
490 return query.hasLabel(type);