Update license date and text
[aai/champ.git] / champ-lib / champ-janus / src / main / java / org / onap / aai / champjanus / graph / impl / JanusChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champjanus.graph.impl;
22
23 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
24 import org.apache.tinkerpop.gremlin.structure.Edge;
25 import org.apache.tinkerpop.gremlin.structure.Vertex;
26 import org.janusgraph.core.*;
27 import org.onap.aai.champcore.Formatter;
28 import org.janusgraph.core.schema.JanusGraphIndex;
29 import org.janusgraph.core.schema.JanusGraphManagement;
30 import org.janusgraph.core.schema.SchemaAction;
31 import org.janusgraph.core.schema.SchemaStatus;
32 import org.janusgraph.graphdb.database.management.ManagementSystem;
33 import org.onap.aai.champcore.ChampCapabilities;
34 import org.onap.aai.champcore.FormatMapper;
35 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
36 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
37 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
38 import org.onap.aai.champcore.model.*;
39 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
40 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.time.temporal.ChronoUnit;
45 import java.util.*;
46 import java.util.concurrent.ExecutionException;
47 import java.util.stream.Stream;
48 import java.util.stream.StreamSupport;
49
50 public final class JanusChampGraphImpl extends AbstractTinkerpopChampGraph {
51   private static final Logger LOGGER = LoggerFactory.getLogger(JanusChampGraphImpl.class);
52   private static final String JANUS_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
53   private static final String JANUS_HBASE_TABLE = "storage.hbase.table";
54   private static final String JANUS_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
55   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
56   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
57
58   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
59
60     @Override
61     public boolean canDeleteObjectIndices() {
62       return false;
63     }
64
65     @Override
66     public boolean canDeleteRelationshipIndices() {
67       return false;
68     }
69   };
70
71   private final JanusGraph graph;
72
73   public JanusChampGraphImpl(Builder builder) {
74     super(builder.graphConfiguration);
75     final JanusGraphFactory.Builder janusGraphBuilder = JanusGraphFactory.build();
76
77     for (Map.Entry<String, Object> janusGraphProperty : builder.graphConfiguration.entrySet()) {
78       janusGraphBuilder.set(janusGraphProperty.getKey(), janusGraphProperty.getValue());
79     }
80     
81     janusGraphBuilder.set(JANUS_UNIQUE_SUFFIX, ((short) new Random().nextInt(Short.MAX_VALUE)+""));
82
83     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
84
85     if ("cassandra".equals(storageBackend) ||
86         "cassandrathrift".equals(storageBackend) ||
87         "astyanax".equals(storageBackend) ||
88         "embeddedcassandra".equals(storageBackend)) {
89         
90         janusGraphBuilder.set(JANUS_CASSANDRA_KEYSPACE, builder.graphName);
91     } else if ("hbase".equals(storageBackend)) {
92       janusGraphBuilder.set(JANUS_HBASE_TABLE, builder.graphName);
93     } else if ("berkleyje".equals(storageBackend)) {
94       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
95     } else if ("inmemory".equals(storageBackend)) {
96     } else {
97       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
98     }
99     
100     LOGGER.info("Instantiated data access layer for Janus graph data store with backend: " + storageBackend);
101     this.graph = janusGraphBuilder.open();
102   }
103
104   public static class Builder {
105     private final String graphName;
106
107     private final Map<String, Object> graphConfiguration = new HashMap<String, Object>();
108
109     public Builder(String graphName) {
110       this.graphName = graphName;
111     }
112     
113     public Builder(String graphName, Map<String, Object> properties) {
114         this.graphName = graphName;
115         properties(properties);
116     }
117
118     public Builder properties(Map<String, Object> properties) {
119       if (properties.containsKey(JANUS_CASSANDRA_KEYSPACE)) {
120         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
121             + " in initial configuration - this path is used"
122             + " to specify graph names");
123       }
124
125       this.graphConfiguration.putAll(properties);
126       return this;
127     }
128
129     public Builder property(String path, Object value) {
130       if (path.equals(JANUS_CASSANDRA_KEYSPACE)) {
131         throw new IllegalArgumentException("Cannot use path " + JANUS_CASSANDRA_KEYSPACE
132             + " in initial configuration - this path is used"
133             + " to specify graph names");
134       }
135       graphConfiguration.put(path, value);
136       return this;
137     }
138
139     public JanusChampGraphImpl build() {
140       return new JanusChampGraphImpl(this);
141     }
142   }
143
144   @Override
145   protected JanusGraph getGraph() {
146     return graph;
147   }
148
149  
150   @Override
151   protected ChampSchemaEnforcer getSchemaEnforcer() {
152     return SCHEMA_ENFORCER;
153   }
154
155   @Override
156   public void executeStoreObjectIndex(ChampObjectIndex index) {
157     if (isShutdown()) {
158       throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
159     }
160
161     final JanusGraph graph = getGraph();
162     final JanusGraphManagement createIndexMgmt = graph.openManagement();
163     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
164
165     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
166       createIndexMgmt.rollback();
167       return; //Ignore, index already exists
168     }
169
170     createIndexMgmt.buildIndex(index.getName(), Vertex.class).addKey(pk).buildCompositeIndex();
171
172     createIndexMgmt.commit();
173     graph.tx().commit();
174
175     awaitIndexCreation(index.getName());
176   }
177
178   @Override
179   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
180     if (isShutdown()) {
181       throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
182     }
183
184     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
185     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
186
187     if (index == null) {
188       return Optional.empty();
189     }
190     if (index.getIndexedElement() != JanusGraphVertex.class) {
191       return Optional.empty();
192     }
193
194     return Optional.of(ChampObjectIndex.create()
195         .ofName(indexName)
196         .onType(ChampObject.ReservedTypes.ANY.toString())
197         .forField(index.getFieldKeys()[0].name())
198         .build());
199   }
200
201   @Override
202   public Stream<ChampObjectIndex> retrieveObjectIndices() {
203     if (isShutdown()) {
204       throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
205     }
206
207     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
208     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
209
210     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex>() {
211
212       private ChampObjectIndex next;
213
214       @Override
215       public boolean hasNext() {
216         if (indices.hasNext()) {
217           final JanusGraphIndex index = indices.next();
218
219           next = ChampObjectIndex.create()
220               .ofName(index.name())
221               .onType(ChampObject.ReservedTypes.ANY.toString())
222               .forField(index.getFieldKeys()[0].name())
223               .build();
224           return true;
225         }
226
227         next = null;
228         return false;
229       }
230
231       @Override
232       public ChampObjectIndex next() {
233         if (next == null) {
234           throw new NoSuchElementException();
235         }
236
237         return next;
238       }
239     };
240
241     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
242         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
243   }
244
245   @Override
246   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
247     if (isShutdown()) {
248       throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
249     }
250
251     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
252   }
253
254   @Override
255   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
256     if (isShutdown()) {
257       throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
258     }
259
260     final JanusGraph graph = getGraph();
261     final JanusGraphManagement createIndexMgmt = graph.openManagement();
262     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
263
264     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
265       return; //Ignore, index already exists
266     }
267     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
268
269     createIndexMgmt.commit();
270     graph.tx().commit();
271
272     awaitIndexCreation(index.getName());
273   }
274
275   @Override
276   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
277     if (isShutdown()) {
278       throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
279     }
280
281     final JanusGraphManagement retrieveIndexMgmt = getGraph().openManagement();
282     final JanusGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
283
284     if (index == null) {
285       return Optional.empty();
286     }
287     if (index.getIndexedElement() != JanusGraphEdge.class) {
288       return Optional.empty();
289     }
290
291     return Optional.of(ChampRelationshipIndex.create()
292         .ofName(indexName)
293         .onType(ChampObject.ReservedTypes.ANY.toString())
294         .forField(index.getFieldKeys()[0].name())
295         .build());
296   }
297
298   @Override
299   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
300     if (isShutdown()) {
301       throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
302     }
303
304     final JanusGraphManagement createIndexMgmt = getGraph().openManagement();
305     final Iterator<JanusGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
306
307     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex>() {
308
309       private ChampRelationshipIndex next;
310
311       @Override
312       public boolean hasNext() {
313         if (indices.hasNext()) {
314           final JanusGraphIndex index = indices.next();
315
316           next = ChampRelationshipIndex.create()
317               .ofName(index.name())
318               .onType(ChampRelationship.ReservedTypes.ANY.toString())
319               .forField(index.getFieldKeys()[0].name())
320               .build();
321           return true;
322         }
323
324         next = null;
325         return false;
326       }
327
328       @Override
329       public ChampRelationshipIndex next() {
330         if (next == null) {
331           throw new NoSuchElementException();
332         }
333
334         return next;
335       }
336     };
337
338     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
339         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
340   }
341
342   @Override
343   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
344     if (isShutdown()) {
345       throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
346     }
347
348     throw new UnsupportedOperationException("Cannot delete indices using the JanusChampImpl");
349   }
350
351   private Cardinality getJanusCardinality(ChampCardinality cardinality) {
352     switch (cardinality) {
353       case LIST:
354         return Cardinality.LIST;
355       case SET:
356         return Cardinality.SET;
357       case SINGLE:
358         return Cardinality.SINGLE;
359       default:
360         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
361     }
362   }
363
364   private void awaitIndexCreation(String indexName) {
365     //Wait for the index to become available
366     try {
367       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
368           .status(SchemaStatus.ENABLED)
369           .timeout(1, ChronoUnit.SECONDS)
370           .call()
371           .getSucceeded()) {
372         return; //Empty graphs immediately ENABLE indices
373       }
374
375       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
376           .status(SchemaStatus.REGISTERED)
377           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
378           .call()
379           .getSucceeded()) {
380         LOGGER.warn("Object index was created, but timed out while waiting for it to be registered");
381         return;
382       }
383     } catch (InterruptedException e) {
384       LOGGER.warn("Interrupted while waiting for object index creation status");
385       return;
386     }
387
388     //Reindex the existing data
389
390     try {
391       final JanusGraphManagement updateIndexMgmt = graph.openManagement();
392       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName), SchemaAction.REINDEX).get();
393       updateIndexMgmt.commit();
394     } catch (InterruptedException e) {
395       LOGGER.warn("Interrupted while reindexing for object index");
396       return;
397     } catch (ExecutionException e) {
398       LOGGER.warn("Exception occurred during reindexing procedure for creating object index " + indexName, e);
399     }
400
401     try {
402       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
403           .status(SchemaStatus.ENABLED)
404           .timeout(10, ChronoUnit.MINUTES)
405           .call();
406     } catch (InterruptedException e) {
407       LOGGER.warn("Interrupted while waiting for index to transition to ENABLED state");
408       return;
409     }
410   }
411
412   @Override
413   public ChampCapabilities capabilities() {
414     return CAPABILITIES;
415   }
416
417   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
418     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
419
420     final ChampSchema currentSchema = retrieveSchema();
421     final JanusGraphManagement mgmt = getGraph().openManagement();
422
423     try {
424       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
425         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
426           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
427
428           if (currentObjConstraint.isPresent()) {
429             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
430
431             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
432               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
433             }
434           }
435
436           final String newPropertyKeyName = propConstraint.getField().getName();
437
438           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
439
440           mgmt.makePropertyKey(newPropertyKeyName)
441               .dataType(propConstraint.getField().getJavaType())
442               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
443               .make();
444         }
445       }
446
447       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
448
449         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
450
451         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
452
453           if (currentRelConstraint.isPresent()) {
454             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
455
456             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
457               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
458             }
459           }
460
461           final String newPropertyKeyName = propConstraint.getField().getName();
462
463           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Janus to see if another node created this property key
464
465           mgmt.makePropertyKey(newPropertyKeyName)
466               .dataType(propConstraint.getField().getJavaType())
467               .cardinality(getJanusCardinality(propConstraint.getCardinality()))
468               .make();
469         }
470
471         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
472
473         if (edgeLabel != null) {
474           mgmt.makeEdgeLabel(relConstraint.getType())
475               .directed()
476               .make();
477         }
478       }
479
480       mgmt.commit();
481
482       super.storeSchema(schema);
483     } catch (SchemaViolationException | ChampSchemaViolationException e) {
484       mgmt.rollback();
485       throw new ChampSchemaViolationException(e);
486     }
487   }
488   
489         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
490                 return query.hasLabel(type);
491         }
492 }