2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.onap.aai.champ.graph.impl;
24 import java.io.IOException;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
29 import java.util.Map.Entry;
30 import java.util.NoSuchElementException;
31 import java.util.Optional;
33 import java.util.Spliterator;
34 import java.util.Spliterators;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.stream.Stream;
37 import java.util.stream.StreamSupport;
39 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
40 import org.apache.tinkerpop.gremlin.structure.Direction;
41 import org.apache.tinkerpop.gremlin.structure.Edge;
42 import org.apache.tinkerpop.gremlin.structure.Graph;
43 import org.apache.tinkerpop.gremlin.structure.Property;
44 import org.apache.tinkerpop.gremlin.structure.Vertex;
45 import org.apache.tinkerpop.gremlin.structure.VertexProperty;
46 import org.onap.aai.champ.event.ChampEvent;
47 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
48 import org.onap.aai.champ.exceptions.ChampMarshallingException;
49 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
50 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
51 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
52 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
53 import org.onap.aai.champ.graph.impl.TitanChampGraphImpl.Builder;
54 import org.onap.aai.champ.model.ChampObject;
55 import org.onap.aai.champ.model.ChampPartition;
56 import org.onap.aai.champ.model.ChampRelationship;
57 import org.onap.aai.champ.model.ChampSchema;
58 import org.onap.aai.champ.model.fluent.partition.CreateChampPartitionable;
59 import org.onap.aai.champ.transform.TinkerpopChampformer;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.ObjectMapper;
66 public abstract class AbstractTinkerpopChampGraph extends AbstractValidatingChampGraph {
68 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTinkerpopChampGraph.class);
69 private static final TinkerpopChampformer TINKERPOP_CHAMPFORMER = new TinkerpopChampformer();
70 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
72 private static final int COMMIT_RETRY_COUNT = 3;
74 private volatile AtomicBoolean isShutdown;
76 protected AbstractTinkerpopChampGraph(Map<String, Object> properties) {
79 isShutdown = new AtomicBoolean(false);
80 Runtime.getRuntime().addShutdownHook(shutdownHook);
83 private static final TinkerpopChampformer getChampformer() {
84 return TINKERPOP_CHAMPFORMER;
87 private static final ObjectMapper getObjectMapper() {
91 private Vertex writeVertex(ChampObject object) throws ChampObjectNotExistsException, ChampMarshallingException {
94 if (object.getKey().isPresent()) {
95 final Iterator<Vertex> vertexIter = getGraph().vertices(object.getKey().get());
97 if (vertexIter.hasNext()) {
98 vertex = vertexIter.next();
99 } else throw new ChampObjectNotExistsException();
101 vertex = getGraph().addVertex(object.getType());
104 for (Entry<String, Object> property : object.getProperties().entrySet()) {
106 if (property.getValue() instanceof List) {
107 for (Object subPropertyValue : (List<?>) property.getValue()) {
108 vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
110 } else if (property.getValue() instanceof Set) {
111 for (Object subPropertyValue : (Set<?>) property.getValue()) {
112 vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
115 vertex.property(property.getKey(), property.getValue());
122 private Vertex replaceVertex(ChampObject object) throws ChampObjectNotExistsException, ChampMarshallingException {
125 if (object.getKey().isPresent()) {
126 final Iterator<Vertex> vertexIter = getGraph().vertices(object.getKey().get());
128 if (vertexIter.hasNext()) {
129 vertex = vertexIter.next();
130 } else throw new ChampObjectNotExistsException();
132 throw new ChampObjectNotExistsException();
135 //clear all the existing properties
136 Iterator<VertexProperty<Object>> it = vertex.properties();
137 while (it.hasNext()) {
141 for (Entry<String, Object> property : object.getProperties().entrySet()) {
143 if (property.getValue() instanceof List) {
144 for (Object subPropertyValue : (List<?>) property.getValue()) {
145 vertex.property(VertexProperty.Cardinality.list, property.getKey(), subPropertyValue);
147 } else if (property.getValue() instanceof Set) {
148 for (Object subPropertyValue : (Set<?>) property.getValue()) {
149 vertex.property(VertexProperty.Cardinality.set, property.getKey(), subPropertyValue);
152 vertex.property(property.getKey(), property.getValue());
159 private Edge writeEdge(ChampRelationship relationship) throws ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampMarshallingException {
161 final Vertex source = writeVertex(relationship.getSource());
162 final Vertex target = writeVertex(relationship.getTarget());
165 if (relationship.getKey().isPresent()) {
166 final Iterator<Edge> edgeIter = getGraph().edges(relationship.getKey().get());
168 if (edgeIter.hasNext()) {
169 edge = edgeIter.next();
170 } else throw new ChampRelationshipNotExistsException();
172 edge = source.addEdge(relationship.getType(), target);
175 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
176 edge.property(property.getKey(), property.getValue());
182 private Edge replaceEdge(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampMarshallingException {
185 if(!relationship.getSource().getKey().isPresent() || !relationship.getTarget().getKey().isPresent()){
186 throw new IllegalArgumentException("Invalid source/target");
189 if (relationship.getKey().isPresent()) {
190 final Iterator<Edge> edgeIter = getGraph().edges(relationship.getKey().get());
192 if (edgeIter.hasNext()) {
193 edge = edgeIter.next();
194 //validate if the source/target are the same as before. Throw error if not the same
195 if (!edge.outVertex().id().equals(relationship.getSource().getKey().get())
196 || !edge.inVertex().id().equals(relationship.getTarget().getKey().get())) {
197 throw new IllegalArgumentException("source/target can't be updated");
200 } else throw new ChampRelationshipNotExistsException();
202 throw new ChampRelationshipNotExistsException();
205 // clear all the existing properties
206 Iterator<Property<Object>> it = edge.properties();
207 while (it.hasNext()) {
211 for (Entry<String, Object> property : relationship.getProperties().entrySet()) {
212 edge.property(property.getKey(), property.getValue());
218 private void tryRollback() {
219 if (getGraph().features().graph().supportsTransactions()) {
220 getGraph().tx().rollback();
224 private void tryCommit() {
226 if (getGraph().features().graph().supportsTransactions()) {
228 final long initialBackoff = (int) (Math.random() * 50);
230 for (int i = 0; i < COMMIT_RETRY_COUNT; i++) {
232 getGraph().tx().commit();
234 } catch (Throwable e) {
235 if (i == COMMIT_RETRY_COUNT - 1) {
236 LOGGER.error("Maxed out commit attempt retries, client must handle exception and retry", e);
237 getGraph().tx().rollback();
241 final long backoff = (long) Math.pow(2, i) * initialBackoff;
242 LOGGER.warn("Caught exception while retrying transaction commit, retrying in " + backoff + " ms");
245 Thread.sleep(backoff);
246 } catch (InterruptedException ie) {
247 LOGGER.info("Interrupted while backing off on transaction commit");
255 protected abstract Graph getGraph();
257 private Thread shutdownHook = new Thread() {
262 } catch (IllegalStateException e) {
263 //Suppress, because shutdown() has already been called
268 protected boolean isShutdown() {
269 return isShutdown.get();
273 public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
274 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
276 //If they provided the object key, do this the quick way rather than creating a traversal
277 if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString())) {
279 final Optional<ChampObject> object = retrieveObject(queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_KEY.toString()));
281 if (object.isPresent()) return Stream.of(object.get());
282 else return Stream.empty();
283 } catch (ChampUnmarshallingException e) {
284 LOGGER.warn("Failed to unmarshall object", e);
285 return Stream.empty();
289 final GraphTraversal<Vertex, Vertex> query = getGraph().traversal().V();
291 for (Entry<String, Object> filter : queryParams.entrySet()) {
292 if (filter.getKey().equals(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
293 continue; //For performance reasons, the label is the last thing to be added
295 query.has(filter.getKey(), filter.getValue());
299 if (queryParams.containsKey(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString())) {
300 query.hasLabel((String) queryParams.get(ChampObject.ReservedPropertyKeys.CHAMP_OBJECT_TYPE.toString()));
303 final Iterator<ChampObject> objIter = new Iterator<ChampObject> () {
305 private ChampObject next;
309 public boolean hasNext() {
310 while (query.hasNext()) {
312 next = getChampformer().unmarshallObject(query.next());
314 } catch (ChampUnmarshallingException e) {
315 LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
319 tryCommit(); //Danger ahead if this iterator is not completely consumed
320 //then the transaction cache will hold stale values
327 public ChampObject next() {
328 if (next == null) throw new NoSuchElementException();
334 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
335 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
339 public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
340 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
342 final Iterator<Vertex> vertices = getGraph().vertices(key);
343 final Optional<ChampObject> optionalObject;
345 if (!vertices.hasNext()) optionalObject = Optional.empty();
346 else optionalObject = Optional.of(getChampformer().unmarshallObject(vertices.next()));
350 return optionalObject;
354 public Stream<ChampRelationship> retrieveRelationships(ChampObject source) throws ChampUnmarshallingException, ChampObjectNotExistsException {
355 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
357 final Vertex sourceVertex;
360 sourceVertex = getGraph().vertices(source.getKey().get()).next();
361 } catch (NoSuchElementException e) {
364 throw new ChampObjectNotExistsException();
367 final Iterator<Edge> edges = sourceVertex.edges(Direction.BOTH);
368 final Iterator<ChampRelationship> relIter = new Iterator<ChampRelationship> () {
370 private ChampRelationship next;
373 public boolean hasNext() {
374 while (edges.hasNext()) {
376 next = getChampformer().unmarshallRelationship(edges.next());
378 } catch (ChampUnmarshallingException e) {
379 LOGGER.warn("Failed to unmarshall tinkerpop edge during query, returning partial results", e);
383 tryCommit();//Danger ahead if this iterator is not completely
384 //consumed, then the transaction cache will be stale
390 public ChampRelationship next() {
391 if (next == null) throw new NoSuchElementException();
397 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
398 relIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
402 public ChampObject doStoreObject(ChampObject object) throws ChampMarshallingException, ChampObjectNotExistsException {
405 final Vertex vertex = writeVertex(object);
409 return ChampObject.create()
411 .withKey(vertex.id())
414 } catch (ChampObjectNotExistsException e) {
422 public ChampObject doReplaceObject(ChampObject object) throws ChampMarshallingException, ChampObjectNotExistsException {
425 final Vertex vertex = replaceVertex(object);
429 return ChampObject.create()
431 .withKey(vertex.id())
434 } catch (ChampObjectNotExistsException e) {
441 public void executeDeleteObject(Object key) throws ChampObjectNotExistsException {
442 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
444 final Iterator<Vertex> vertex = getGraph().vertices(key);
446 if (!vertex.hasNext()) {
449 throw new ChampObjectNotExistsException();
452 vertex.next().remove();
458 public ChampRelationship doStoreRelationship(ChampRelationship relationship)
459 throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampRelationshipNotExistsException, ChampMarshallingException {
462 final Edge edge = writeEdge(relationship);
466 return getChampformer().unmarshallRelationship(edge);
468 } catch (ChampObjectNotExistsException | ChampRelationshipNotExistsException | ChampUnmarshallingException | ChampMarshallingException e) {
476 public ChampRelationship doReplaceRelationship(ChampRelationship relationship)
477 throws ChampUnmarshallingException, ChampRelationshipNotExistsException, ChampMarshallingException {
480 final Edge edge = replaceEdge(relationship);
484 return getChampformer().unmarshallRelationship(edge);
486 } catch ( ChampRelationshipNotExistsException | ChampUnmarshallingException | ChampMarshallingException e) {
494 public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
495 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
497 //If they provided the relationship key, do this the quick way rather than creating a traversal
498 if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString())) {
500 final Optional<ChampRelationship> relationship = retrieveRelationship(queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_KEY.toString()));
502 if (relationship.isPresent()) return Stream.of(relationship.get());
503 else return Stream.empty();
504 } catch (ChampUnmarshallingException e) {
505 LOGGER.warn("Failed to unmarshall relationship", e);
506 return Stream.empty();
510 final GraphTraversal<Edge, Edge> query = getGraph().traversal().E();
512 for (Entry<String, Object> filter : queryParams.entrySet()) {
513 if (filter.getKey().equals(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
514 continue; //Add the label last for performance reasons
516 query.has(filter.getKey(), filter.getValue());
520 if (queryParams.containsKey(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString())) {
521 query.hasLabel((String) queryParams.get(ChampRelationship.ReservedPropertyKeys.CHAMP_RELATIONSHIP_TYPE.toString()));
524 final Iterator<ChampRelationship> objIter = new Iterator<ChampRelationship> () {
526 private ChampRelationship next;
530 public boolean hasNext() {
531 while (query.hasNext()) {
533 next = getChampformer().unmarshallRelationship(query.next());
535 } catch (ChampUnmarshallingException e) {
536 LOGGER.warn("Failed to unmarshall tinkerpop vertex during query, returning partial results", e);
540 tryCommit(); //Danger ahead if this iterator is not completely
541 //consumed, then the transaction cache will be stale
548 public ChampRelationship next() {
549 if (next == null) throw new NoSuchElementException();
555 return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
556 objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
560 public Optional<ChampRelationship> retrieveRelationship(Object key)
561 throws ChampUnmarshallingException {
562 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
564 final Iterator<Edge> edge = getGraph().edges(key);
565 final Optional<ChampRelationship> optionalRelationship;
567 if (!edge.hasNext()) optionalRelationship = Optional.empty();
568 else optionalRelationship = Optional.of(getChampformer().unmarshallRelationship(edge.next()));
572 return optionalRelationship;
575 public void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
576 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
577 if (!relationship.getKey().isPresent()) throw new IllegalArgumentException("Key must be provided when deleting a relationship");
579 final Iterator<Edge> edge = getGraph().edges(relationship.getKey().get());
581 if (!edge.hasNext()) {
584 throw new ChampRelationshipNotExistsException();
587 edge.next().remove();
593 public ChampPartition doStorePartition(ChampPartition submittedPartition) throws ChampMarshallingException, ChampObjectNotExistsException, ChampRelationshipNotExistsException {
594 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
597 final HashMap<ChampObject, ChampObject> objectsWithKeys = new HashMap<ChampObject, ChampObject> ();
598 final CreateChampPartitionable storedPartition = ChampPartition.create();
600 for (ChampObject champObject : submittedPartition.getChampObjects()) {
601 final Vertex vertex = writeVertex(champObject);
602 objectsWithKeys.put(champObject, ChampObject.create()
604 .withKey(vertex.id())
608 for (ChampRelationship champRelationship : submittedPartition.getChampRelationships()) {
609 if (!objectsWithKeys.containsKey(champRelationship.getSource())) {
610 final Vertex vertex = writeVertex(champRelationship.getSource());
612 objectsWithKeys.put(champRelationship.getSource(), ChampObject.create()
613 .from(champRelationship.getSource())
614 .withKey(vertex.id())
618 if (!objectsWithKeys.containsKey(champRelationship.getTarget())) {
619 final Vertex vertex = writeVertex(champRelationship.getTarget());
621 objectsWithKeys.put(champRelationship.getTarget(), ChampObject.create()
622 .from(champRelationship.getTarget())
623 .withKey(vertex.id())
627 final ChampRelationship.Builder relWithKeysBuilder = new ChampRelationship.Builder(objectsWithKeys.get(champRelationship.getSource()),
628 objectsWithKeys.get(champRelationship.getTarget()),
629 champRelationship.getType());
631 if (champRelationship.getKey().isPresent()) relWithKeysBuilder.key(champRelationship.getKey().get());
633 relWithKeysBuilder.properties(champRelationship.getProperties());
635 final Edge edge = writeEdge(relWithKeysBuilder.build());
637 storedPartition.withRelationship(ChampRelationship.create()
638 .from(champRelationship)
643 for (ChampObject object : objectsWithKeys.values()) {
644 storedPartition.withObject(object);
649 return storedPartition.build();
651 } catch (ChampObjectNotExistsException | ChampMarshallingException e) {
658 public void executeDeletePartition(ChampPartition graph) {
659 if (isShutdown()) throw new IllegalStateException("Cannot use ChampAPI after calling shutdown()");
661 for (ChampObject champObject : graph.getChampObjects()) {
663 final Object vertexId = champObject.getKey().get();
664 final Iterator<Vertex> vertex = getGraph().vertices(vertexId);
666 if (vertex.hasNext()) {
667 vertex.next().remove();
669 } catch (NoSuchElementException e) {
672 throw new IllegalArgumentException("Must pass a key to delete an object");
676 for (ChampRelationship champRelationship : graph.getChampRelationships()) {
678 final Iterator<Edge> edge = getGraph().edges(champRelationship.getKey().get());
680 if (edge.hasNext()) {
681 edge.next().remove();
683 } catch (NoSuchElementException e) {
686 throw new IllegalArgumentException("Must pass a key to delete a relationship");
695 public void shutdown() {
697 if (isShutdown.compareAndSet(false, true)) {
701 } catch (Throwable t) {
702 LOGGER.error("Exception while shutting down graph", t);
705 throw new IllegalStateException("Cannot call shutdown() after shutdown() was already initiated");
710 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
711 if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
713 if (getGraph().features().graph().variables().supportsVariables()) {
715 getGraph().variables().set("schema", getObjectMapper().writeValueAsBytes(schema));
716 } catch (JsonProcessingException e) {
717 throw new RuntimeException(e);
720 super.storeSchema(schema);
725 public ChampSchema retrieveSchema() {
726 if (isShutdown()) throw new IllegalStateException("Cannot call retrieveSchema() after shutdown has been initiated");
728 if (getGraph().features().graph().variables().supportsVariables()) {
729 final Optional<byte[]> schema = getGraph().variables().get("schema");
731 if (schema.isPresent()) {
733 return getObjectMapper().readValue(schema.get(), ChampSchema.class);
734 } catch (IOException e) {
735 throw new RuntimeException(e);
740 return super.retrieveSchema();
744 public void deleteSchema() {
745 if (isShutdown()) throw new IllegalStateException("Cannot call deleteSchema() after shutdown has been initiated");
747 if (getGraph().features().graph().variables().supportsVariables()) {
748 getGraph().variables().remove("schema");
750 super.deleteSchema();