[AAI-2175] Change aai champ container processes to run as non-root on the host
[aai/champ.git] / champ-lib / champ-titan / src / main / java / org / onap / aai / champtitan / graph / impl / TitanChampGraphImpl.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champtitan.graph.impl;
22
23 import java.security.SecureRandom;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.NoSuchElementException;
32 import java.util.Optional;
33 import java.util.Spliterator;
34 import java.util.Spliterators;
35 import java.util.concurrent.ExecutionException;
36 import java.util.stream.Stream;
37 import java.util.stream.StreamSupport;
38
39 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
40 import org.apache.tinkerpop.gremlin.structure.Edge;
41 import org.apache.tinkerpop.gremlin.structure.Vertex;
42 import org.onap.aai.champcore.ChampCapabilities;
43 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.graph.impl.AbstractTinkerpopChampGraph;
46 import org.onap.aai.champcore.model.ChampCardinality;
47 import org.onap.aai.champcore.model.ChampField;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPropertyConstraint;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.onap.aai.champcore.schema.ChampSchemaEnforcer;
57 import org.onap.aai.champcore.schema.DefaultChampSchemaEnforcer;
58 import org.onap.aai.cl.api.Logger;
59 import org.onap.aai.cl.eelf.LoggerFactory;
60
61 import com.thinkaurelius.titan.core.Cardinality;
62 import com.thinkaurelius.titan.core.EdgeLabel;
63 import com.thinkaurelius.titan.core.PropertyKey;
64 import com.thinkaurelius.titan.core.SchemaViolationException;
65 import com.thinkaurelius.titan.core.TitanEdge;
66 import com.thinkaurelius.titan.core.TitanFactory;
67 import com.thinkaurelius.titan.core.TitanGraph;
68 import com.thinkaurelius.titan.core.TitanVertex;
69 import com.thinkaurelius.titan.core.schema.SchemaAction;
70 import com.thinkaurelius.titan.core.schema.SchemaStatus;
71 import com.thinkaurelius.titan.core.schema.TitanGraphIndex;
72 import com.thinkaurelius.titan.core.schema.TitanManagement;
73 import com.thinkaurelius.titan.core.schema.TitanManagement.IndexBuilder;
74 import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
75
76 public final class TitanChampGraphImpl extends AbstractTinkerpopChampGraph {
77
78   private static final Logger LOGGER = LoggerFactory.getInstance().getLogger(TitanChampGraphImpl.class);
79   private static final String TITAN_UNIQUE_SUFFIX = "graph.unique-instance-id-suffix";
80   private static final String TITAN_CASSANDRA_KEYSPACE = "storage.cassandra.keyspace";
81   private static final String TITAN_HBASE_TABLE = "storage.hbase.table";
82   private static final ChampSchemaEnforcer SCHEMA_ENFORCER = new DefaultChampSchemaEnforcer();
83   private static final int REGISTER_OBJECT_INDEX_TIMEOUT_SECS = 30;
84
85   private static final ChampCapabilities CAPABILITIES = new ChampCapabilities() {
86
87     @Override
88     public boolean canDeleteObjectIndices() {
89       return false;
90     }
91
92     @Override
93     public boolean canDeleteRelationshipIndices() {
94       return false;
95     }
96   };
97
98   private final TitanGraph graph;
99
100   public TitanChampGraphImpl(Builder builder) {
101     super(builder.graphConfiguration);
102     final TitanFactory.Builder titanGraphBuilder = TitanFactory.build();
103
104     for (Entry<String, Object> titanGraphProperty : builder.graphConfiguration.entrySet()) {
105       titanGraphBuilder.set(titanGraphProperty.getKey(), titanGraphProperty.getValue());
106     }
107
108     titanGraphBuilder.set(TITAN_UNIQUE_SUFFIX, ((short) new SecureRandom().nextInt(Short.MAX_VALUE)+""));
109     
110     final Object storageBackend = builder.graphConfiguration.get("storage.backend");
111
112     if ("cassandra".equals(storageBackend) ||
113         "cassandrathrift".equals(storageBackend) ||
114         "astyanax".equals(storageBackend) ||
115         "embeddedcassandra".equals(storageBackend)) {
116       titanGraphBuilder.set(TITAN_CASSANDRA_KEYSPACE, builder.graphName);
117     } else if ("hbase".equals(storageBackend)) {
118       titanGraphBuilder.set(TITAN_HBASE_TABLE, builder.graphName);
119     } else if ("berkleyje".equals(storageBackend)) {
120       throw new RuntimeException("storage.backend=berkleyje cannot handle multiple graphs on a single DB, not usable");
121     } else if ("inmemory".equals(storageBackend)) {
122     } else {
123       throw new RuntimeException("Unknown storage.backend=" + storageBackend);
124     }
125     
126     LOGGER.info(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_INFO,
127         "Instantiated data access layer for Titan graph data store with backend: " + storageBackend);
128
129     this.graph = titanGraphBuilder.open();
130   }
131
132   public static class Builder {
133     private final String graphName;
134
135     private final Map<String, Object> graphConfiguration = new HashMap<String, Object> ();
136
137     public Builder(String graphName) {
138       this.graphName = graphName;
139     }
140     
141     public Builder(String graphName, Map<String, Object> properties) {
142         this.graphName = graphName;
143         properties(properties);
144     }
145
146     public Builder properties(Map<String, Object> properties) {
147       if (properties.containsKey(TITAN_CASSANDRA_KEYSPACE))
148         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
149             + " in initial configuration - this path is used"
150             + " to specify graph names");
151
152       this.graphConfiguration.putAll(properties);
153       return this;
154     }
155
156     public Builder property(String path, Object value) {
157       if (path.equals(TITAN_CASSANDRA_KEYSPACE))
158         throw new IllegalArgumentException("Cannot use path " + TITAN_CASSANDRA_KEYSPACE
159             + " in initial configuration - this path is used"
160             + " to specify graph names");
161       graphConfiguration.put(path, value);
162       return this;
163     }
164
165     public TitanChampGraphImpl build() {
166       return new TitanChampGraphImpl(this);
167     }
168   }
169
170   @Override
171   protected TitanGraph getGraph() {
172     return graph;
173   }
174
175   @Override
176   protected ChampSchemaEnforcer getSchemaEnforcer() {
177     return SCHEMA_ENFORCER;
178   }
179
180   public void executeStoreObjectIndex(ChampObjectIndex index) {
181     if (isShutdown()) throw new IllegalStateException("Cannot call storeObjectIndex() after shutdown has been initiated");
182
183     final TitanGraph graph = getGraph();
184     final TitanManagement createIndexMgmt = graph.openManagement();
185
186     if (createIndexMgmt.getGraphIndex(index.getName()) != null) {
187       createIndexMgmt.rollback();
188       return; //Ignore, index already exists
189     }
190
191     IndexBuilder ib = createIndexMgmt.buildIndex(index.getName(), Vertex.class);
192     for (ChampField field : index.getFields()) {
193       PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(field.getName());
194       ib = ib.addKey(pk);
195     }
196     ib.buildCompositeIndex();
197     createIndexMgmt.commit();
198     graph.tx().commit();
199
200     awaitIndexCreation(index.getName());
201   }
202
203   @Override
204   public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
205     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndex() after shutdown has been initiated");
206
207     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
208     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
209
210     if (index == null) return Optional.empty();
211     if (index.getIndexedElement() != TitanVertex.class) return Optional.empty();
212     List<String> fieldNames = new ArrayList<String>();
213     for (int i = 0; i < index.getFieldKeys().length; i++) {
214       fieldNames.add(index.getFieldKeys()[i].name());
215     }
216
217     return Optional.of(ChampObjectIndex.create()
218         .ofName(indexName)
219         .onType(ChampObject.ReservedTypes.ANY.toString())
220         .forFields(fieldNames)
221         .build());
222   }
223
224   @Override
225   public Stream<ChampObjectIndex> retrieveObjectIndices() {
226     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveObjectIndices() after shutdown has been initiated");
227
228     final TitanManagement createIndexMgmt = getGraph().openManagement();
229     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Vertex.class).iterator();
230
231     final Iterator<ChampObjectIndex> objIter = new Iterator<ChampObjectIndex> () {
232
233       private ChampObjectIndex next;
234
235       @Override
236       public boolean hasNext() {
237         if (indices.hasNext()) {
238           final TitanGraphIndex index = indices.next();
239           
240           List<String> fieldNames = new ArrayList<String>();
241           for (int i = 0; i < index.getFieldKeys().length; i++) {
242             fieldNames.add(index.getFieldKeys()[i].name());
243           }
244
245           next = ChampObjectIndex.create()
246               .ofName(index.name())
247               .onType(ChampObject.ReservedTypes.ANY.toString())
248               .forFields(fieldNames)
249               .build();
250           return true;
251         }
252
253         next = null;
254         return false;
255       }
256
257       @Override
258       public ChampObjectIndex next() {
259         if (next == null) throw new NoSuchElementException();
260
261         return next;
262       }
263     };
264
265     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
266         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
267   }
268
269   public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
270     if (isShutdown()) throw new IllegalStateException("Cannot call deleteObjectIndex() after shutdown has been initiated");
271
272     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
273   }
274
275   public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
276     if (isShutdown()) throw new IllegalStateException("Cannot call storeRelationshipIndex() after shutdown has been initiated");
277
278     final TitanGraph graph = getGraph();
279     final TitanManagement createIndexMgmt = graph.openManagement();
280     final PropertyKey pk = createIndexMgmt.getOrCreatePropertyKey(index.getField().getName());
281
282     if (createIndexMgmt.getGraphIndex(index.getName()) != null) return; //Ignore, index already exists
283     createIndexMgmt.buildIndex(index.getName(), Edge.class).addKey(pk).buildCompositeIndex();
284
285     createIndexMgmt.commit();
286     graph.tx().commit();
287
288     awaitIndexCreation(index.getName());
289   }
290
291   @Override
292   public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
293     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndex() after shutdown has been initiated");
294
295     final TitanManagement retrieveIndexMgmt = getGraph().openManagement();
296     final TitanGraphIndex index = retrieveIndexMgmt.getGraphIndex(indexName);
297
298     if (index == null) return Optional.empty();
299     if (index.getIndexedElement() != TitanEdge.class) return Optional.empty();
300
301     return Optional.of(ChampRelationshipIndex.create()
302         .ofName(indexName)
303         .onType(ChampObject.ReservedTypes.ANY.toString())
304         .forField(index.getFieldKeys()[0].name())
305         .build());
306   }
307
308   @Override
309   public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
310     if (isShutdown()) throw new IllegalStateException("Cannot call retrieveRelationshipIndices() after shutdown has been initiated");
311
312     final TitanManagement createIndexMgmt = getGraph().openManagement();
313     final Iterator<TitanGraphIndex> indices = createIndexMgmt.getGraphIndexes(Edge.class).iterator();
314
315     final Iterator<ChampRelationshipIndex> objIter = new Iterator<ChampRelationshipIndex> () {
316
317       private ChampRelationshipIndex next;
318
319       @Override
320       public boolean hasNext() {
321         if (indices.hasNext()) {
322           final TitanGraphIndex index = indices.next();
323
324           next = ChampRelationshipIndex.create()
325               .ofName(index.name())
326               .onType(ChampRelationship.ReservedTypes.ANY.toString())
327               .forField(index.getFieldKeys()[0].name())
328               .build();
329           return true;
330         }
331
332         next = null;
333         return false;
334       }
335
336       @Override
337       public ChampRelationshipIndex next() {
338         if (next == null) throw new NoSuchElementException();
339
340         return next;
341       }
342     };
343
344     return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
345         objIter, Spliterator.ORDERED | Spliterator.NONNULL), false);
346   }
347
348   public void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
349     if (isShutdown()) throw new IllegalStateException("Cannot call deleteRelationshipIndex() after shutdown has been initiated");
350
351     throw new UnsupportedOperationException("Cannot delete indices using the TitanChampImpl");
352   }
353
354   private Cardinality getTitanCardinality(ChampCardinality cardinality) {
355     switch (cardinality) {
356       case LIST:
357         return Cardinality.LIST;
358       case SET:
359         return Cardinality.SET;
360       case SINGLE:
361         return Cardinality.SINGLE;
362       default:
363         throw new RuntimeException("Unknown ChampCardinality " + cardinality);
364     }
365   }
366
367   private void awaitIndexCreation(String indexName) {
368     //Wait for the index to become available
369     try {
370       if (ManagementSystem.awaitGraphIndexStatus(graph, indexName)
371           .status(SchemaStatus.ENABLED)
372           .timeout(1, ChronoUnit.SECONDS)
373           .call()
374           .getSucceeded()) {
375         return; //Empty graphs immediately ENABLE indices
376       }
377
378       if (!ManagementSystem.awaitGraphIndexStatus(graph, indexName)
379           .status(SchemaStatus.REGISTERED)
380           .timeout(REGISTER_OBJECT_INDEX_TIMEOUT_SECS, ChronoUnit.SECONDS)
381           .call()
382           .getSucceeded()) {
383         LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
384             "Object index was created, but timed out while waiting for it to be registered");
385         return;
386       }
387     } catch (InterruptedException e) {
388       LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
389           "Interrupted while waiting for object index creation status");
390       Thread.currentThread().interrupt();
391       return;
392     }
393
394     //Reindex the existing data
395
396     try {
397       final TitanManagement updateIndexMgmt = graph.openManagement();
398       updateIndexMgmt.updateIndex(updateIndexMgmt.getGraphIndex(indexName),SchemaAction.REINDEX).get();
399       updateIndexMgmt.commit();
400     } catch (InterruptedException e) {
401       LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
402           "Interrupted while reindexing for object index");
403       Thread.currentThread().interrupt();
404       return;
405     } catch (ExecutionException e) {
406       LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN,
407           "Exception occurred during reindexing procedure for creating object index " + indexName + ". " + e.getMessage());
408     }
409
410     try {
411       ManagementSystem.awaitGraphIndexStatus(graph, indexName)
412           .status(SchemaStatus.ENABLED)
413           .timeout(10, ChronoUnit.MINUTES)
414           .call();
415     } catch (InterruptedException e) {
416       LOGGER.warn(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_WARN, 
417           "Interrupted while waiting for index to transition to ENABLED state");
418       Thread.currentThread().interrupt();
419       return;
420     }
421   }
422
423   @Override
424   public ChampCapabilities capabilities() {
425     return CAPABILITIES;
426   }
427
428   @Override
429   public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
430     if (isShutdown()) throw new IllegalStateException("Cannot call storeSchema() after shutdown has been initiated");
431
432     final ChampSchema currentSchema = retrieveSchema();
433     final TitanManagement mgmt = getGraph().openManagement();
434
435     try {
436       for (ChampObjectConstraint objConstraint : schema.getObjectConstraints().values()) {
437         for (ChampPropertyConstraint propConstraint : objConstraint.getPropertyConstraints()) {
438           final Optional<ChampObjectConstraint> currentObjConstraint = currentSchema.getObjectConstraint(objConstraint.getType());
439
440           if (currentObjConstraint.isPresent()) {
441             final Optional<ChampPropertyConstraint> currentPropConstraint = currentObjConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
442
443             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
444               throw new ChampSchemaViolationException("Cannot update already existing property on object type " + objConstraint.getType() + ": " + propConstraint);
445             }
446           }
447
448           final String newPropertyKeyName = propConstraint.getField().getName();
449
450           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
451
452           mgmt.makePropertyKey(newPropertyKeyName)
453               .dataType(propConstraint.getField().getJavaType())
454               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
455               .make();
456         }
457       }
458
459       for (ChampRelationshipConstraint relConstraint : schema.getRelationshipConstraints().values()) {
460
461         final Optional<ChampRelationshipConstraint> currentRelConstraint = currentSchema.getRelationshipConstraint(relConstraint.getType());
462
463         for (ChampPropertyConstraint propConstraint : relConstraint.getPropertyConstraints()) {
464
465           if (currentRelConstraint.isPresent()) {
466             final Optional<ChampPropertyConstraint> currentPropConstraint = currentRelConstraint.get().getPropertyConstraint(propConstraint.getField().getName());
467
468             if (currentPropConstraint.isPresent() && currentPropConstraint.get().compareTo(propConstraint) != 0) {
469               throw new ChampSchemaViolationException("Cannot update already existing property on relationship type " + relConstraint.getType());
470             }
471           }
472
473           final String newPropertyKeyName = propConstraint.getField().getName();
474
475           if (mgmt.getPropertyKey(newPropertyKeyName) != null) continue; //Check Titan to see if another node created this property key
476
477           mgmt.makePropertyKey(newPropertyKeyName)
478               .dataType(propConstraint.getField().getJavaType())
479               .cardinality(getTitanCardinality(propConstraint.getCardinality()))
480               .make();
481         }
482
483         final EdgeLabel edgeLabel = mgmt.getEdgeLabel(relConstraint.getType());
484
485         if (edgeLabel != null) mgmt.makeEdgeLabel(relConstraint.getType())
486             .directed()
487             .make();
488       }
489
490       mgmt.commit();
491
492       super.storeSchema(schema);
493     } catch (SchemaViolationException | ChampSchemaViolationException e) {
494       mgmt.rollback();
495       throw new ChampSchemaViolationException(e);
496     }
497   }
498
499         @Override
500         public GraphTraversal<?, ?> hasLabel(GraphTraversal<?, ?> query, Object type) {
501                 return query.hasLabel((String) type);
502         }
503
504   @Override
505   public void createDefaultIndexes() {
506     LOGGER.error(ChampTitanMsgs.TITAN_CHAMP_GRAPH_IMPL_ERROR,
507         "No default indexes being created");
508   }
509 }