Enhance SchemaGenerator to also generate indices for relationships
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / dbgen / SchemaGenerator.java
1 /*
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright © 2024 DEUTSCHE TELEKOM AG.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.aai.dbgen;
24
25 import com.google.common.collect.Multimap;
26
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Objects;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ExecutionException;
39 import java.util.function.Function;
40 import java.util.stream.Collectors;
41
42 import org.apache.tinkerpop.gremlin.structure.Direction;
43 import org.apache.tinkerpop.gremlin.structure.Vertex;
44 import org.janusgraph.core.Cardinality;
45 import org.janusgraph.core.EdgeLabel;
46 import org.janusgraph.core.JanusGraph;
47 import org.janusgraph.core.Multiplicity;
48 import org.janusgraph.core.PropertyKey;
49 import org.janusgraph.core.schema.ConsistencyModifier;
50 import org.janusgraph.core.schema.JanusGraphIndex;
51 import org.janusgraph.core.schema.JanusGraphManagement;
52 import org.janusgraph.core.schema.JanusGraphManagement.IndexJobFuture;
53 import org.janusgraph.core.schema.RelationTypeIndex;
54 import org.janusgraph.core.schema.SchemaAction;
55 import org.janusgraph.core.schema.SchemaStatus;
56 import org.janusgraph.graphdb.database.StandardJanusGraph;
57 import org.janusgraph.graphdb.database.management.ManagementSystem;
58 import org.janusgraph.graphdb.database.management.RelationIndexStatusReport;
59 import org.onap.aai.config.SpringContextAware;
60 import org.onap.aai.edges.EdgeIngestor;
61 import org.onap.aai.edges.EdgeRule;
62 import org.onap.aai.edges.exceptions.EdgeRuleNotFoundException;
63 import org.onap.aai.introspection.Introspector;
64 import org.onap.aai.introspection.LoaderUtil;
65 import org.onap.aai.logging.LogFormatTools;
66 import org.onap.aai.schema.enums.PropertyMetadata;
67 import org.onap.aai.util.AAIConfig;
68 import org.onap.aai.util.AAIConstants;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 public class SchemaGenerator {
73
74     private static final Logger LOGGER = LoggerFactory.getLogger(SchemaGenerator.class);
75     private static int indexRecoveryRetryCounter = 0;
76
77     private SchemaGenerator() {
78     }
79
80     /**
81      * Load schema into JanusGraph.
82      *
83      * @param graphMgmt
84      *        the graph mgmt
85      */
86     public static List<String> loadSchemaIntoJanusGraph(final JanusGraph graph, String backend, boolean dbNotEmpty) {
87         JanusGraphManagement graphMgmt = graph.openManagement();
88         final List<String> elementsToReindex = new ArrayList<>();
89
90         try {
91             AAIConfig.init();
92         } catch (Exception ex) {
93             LOGGER.error(" ERROR - Could not run AAIConfig.init(). {}", LogFormatTools.getStackTop(ex));
94             System.exit(1);
95         }
96
97         // NOTE - JanusGraph 0.5.3 doesn't keep a list of legal node Labels.
98         // They are only used when a vertex is actually being created.
99         // JanusGraph 1.1 will keep track (we think).
100
101         // Use EdgeRules to make sure edgeLabels are defined in the db. NOTE:
102         // the multiplicty used here is
103         // always "MULTI". This is not the same as our internal "Many2Many",
104         // "One2One", "One2Many" or "Many2One"
105         // We use the same edge-label for edges between many different types of
106         // nodes and our internal
107         // multiplicty definitions depends on which two types of nodes are being
108         // connected.
109
110         final Map<String, Introspector> objs = LoaderUtil.getLatestVersion().getAllObjects();
111         final Map<String, PropertyKey> seenProps = new HashMap<>();
112         
113         for (Introspector obj : objs.values()) {
114             createSchemaForObject(graphMgmt, seenProps, obj);
115         }
116
117         makeEdgeLabels(graphMgmt, elementsToReindex, dbNotEmpty);
118
119         LOGGER.info("-- About to call graphMgmt commit");
120         if (backend != null) {
121             LOGGER.info("Successfully loaded the schema to {}", backend);
122         }
123
124         graphMgmt.commit();
125         return elementsToReindex;
126     }
127
128     public static void reindexEdgeIndexes(final JanusGraph graph, Collection<String> edgeLabels) {
129         graph.tx().rollback();
130         if(edgeLabels.isEmpty()) {
131             LOGGER.info("Nothing to reindex.");
132             return;
133         }
134
135         ensureValidIndexState(graph, edgeLabels);
136
137         awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.REGISTERED);
138
139         LOGGER.info("Attempting to transition indexes in REGISTERED state to ENABLED");
140         updateRelationIndexes(graph, edgeLabels, SchemaAction.REINDEX);
141
142         ensureEnabledIndexState(graph, edgeLabels);
143     }
144
145     /**
146      * Indices need to be in status REGISTERED or ENABLED to allow reindexing.
147      * @param graph
148      * @param edgeLabels
149      */
150     private static void ensureEnabledIndexState(final JanusGraph graph, Collection<String> edgeLabels) {
151         JanusGraphManagement graphMgmt = graph.openManagement();
152         List<String> registeredIndexes = new ArrayList<>();
153         for(String label: edgeLabels) {
154             EdgeLabel relation = graphMgmt.getEdgeLabel(label);
155             RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
156             SchemaStatus indexStatus = index.getIndexStatus();
157             if(indexStatus.equals(SchemaStatus.REGISTERED)) {
158                 LOGGER.info("Detected relation index [{}] that is not yet in ENABLED state", relation.name());
159                 registeredIndexes.add(label);
160             }
161         }
162         graphMgmt.commit();
163
164         if(indexRecoveryRetryCounter <= 8 && !registeredIndexes.isEmpty()) {
165             indexRecoveryRetryCounter++;
166             LOGGER.info("[{}] indexes not yet in ENABLED state", registeredIndexes.size());
167             awaitRelationIndexStatus(graph, registeredIndexes,SchemaStatus.ENABLED);
168
169             ensureEnabledIndexState(graph, edgeLabels); // recursively call to make sure there is no invalid state
170         } else {
171             LOGGER.info("All indexes are in ENABLED state, exiting.");
172             return;
173         }
174     }
175
176     /**
177      * Indices need to be in status REGISTERED or ENABLED to allow reindexing.
178      * @param graph
179      * @param edgeLabels
180      */
181     private static void ensureValidIndexState(final JanusGraph graph, Collection<String> edgeLabels) {
182         JanusGraphManagement graphMgmt = graph.openManagement();
183         List<String> installedIndexes = new ArrayList<>();
184         List<String> disabledIndexes = new ArrayList<>();
185         for(String label: edgeLabels) {
186             EdgeLabel relation = graphMgmt.getEdgeLabel(label);
187             RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
188             SchemaStatus indexStatus = index.getIndexStatus();
189             if(indexStatus.equals(SchemaStatus.INSTALLED)) {
190                 LOGGER.info("Detected relation index [{}] with invalid status [{}]", relation.name(), indexStatus);
191                 installedIndexes.add(label);
192             } else if(indexStatus.equals(SchemaStatus.DISABLED)) {
193                 LOGGER.info("Detected relation index [{}] with invalid status [{}]", relation.name(), indexStatus);
194                 disabledIndexes.add(label);
195             }
196         }
197         graphMgmt.commit();
198
199         if(indexRecoveryRetryCounter <= 300 && (!installedIndexes.isEmpty() || !disabledIndexes.isEmpty())) {
200             indexRecoveryRetryCounter++;
201             if(!installedIndexes.isEmpty()) {
202                 LOGGER.info("Attempting to transition indexes in INSTALLED state to REGISTERED");
203                 updateRelationIndexes(graph, installedIndexes, SchemaAction.REGISTER_INDEX);
204                 awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.REGISTERED);
205             }
206             if(!disabledIndexes.isEmpty()) {
207                 LOGGER.info("Attempting to transition indexes in DISABLED state to ENABLED");
208                 updateRelationIndexes(graph, disabledIndexes, SchemaAction.ENABLE_INDEX);
209                 awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.ENABLED);
210             }
211             ensureValidIndexState(graph, edgeLabels); // recursively call to make sure there is no invalid state
212         } else {
213             return;
214         }
215     }
216
217     private static void createSchemaForObject(final JanusGraphManagement graphMgmt, Map<String, PropertyKey> seenProps,
218             Introspector obj) {
219         for (String propName : obj.getProperties()) {
220             String dbPropName = propName;
221             Optional<String> alias = obj.getPropertyMetadata(propName, PropertyMetadata.DB_ALIAS);
222             if (alias.isPresent()) {
223                 dbPropName = alias.get();
224             }
225             if (graphMgmt.containsRelationType(dbPropName)) {
226                 LOGGER.debug(" PropertyKey  [{}] already existed in the DB. ", dbPropName);
227             } else {
228                 Class<?> type = obj.getClass(propName);
229                 Cardinality cardinality = Cardinality.SINGLE;
230                 boolean process = false;
231                 if (obj.isListType(propName) && obj.isSimpleGenericType(propName)) {
232                     cardinality = Cardinality.SET;
233                     type = obj.getGenericTypeClass(propName);
234                     process = true;
235                 } else if (obj.isSimpleType(propName)) {
236                     process = true;
237                 }
238
239                 if (process) {
240
241                     LOGGER.info("Creating PropertyKey: [{}], [{}], [{}]", dbPropName, type.getSimpleName(),
242                             cardinality);
243                     PropertyKey propertyKey;
244                     if (!seenProps.containsKey(dbPropName)) {
245                         propertyKey = graphMgmt.makePropertyKey(dbPropName).dataType(type).cardinality(cardinality)
246                                 .make();
247                         if (dbPropName.equals("aai-uri")) {
248                             String aai_uri_lock_enabled = AAIConfig.get(AAIConstants.AAI_LOCK_URI_ENABLED, "false");
249                             LOGGER.info(" Info: aai_uri_lock_enabled:" + aai_uri_lock_enabled);
250                             if ("true".equals(aai_uri_lock_enabled)) {
251                                 LOGGER.info(" Lock is being set for aai-uri Property.");
252                                 graphMgmt.setConsistency(propertyKey, ConsistencyModifier.LOCK);
253                             }
254                         } else if (dbPropName.equals("resource-version")) {
255                             String aai_rv_lock_enabled = AAIConfig.get(AAIConstants.AAI_LOCK_RV_ENABLED, "false");
256                             LOGGER.info(" Info: aai_rv_lock_enabled:" + aai_rv_lock_enabled);
257                             if ("true".equals(aai_rv_lock_enabled)) {
258                                 LOGGER.info(" Lock is being set for resource-version Property.");
259                                 graphMgmt.setConsistency(propertyKey, ConsistencyModifier.LOCK);
260                             }
261                         }
262                         seenProps.put(dbPropName, propertyKey);
263                     } else {
264                         propertyKey = seenProps.get(dbPropName);
265                     }
266                     if (graphMgmt.containsGraphIndex(dbPropName)) {
267                         LOGGER.debug(" Index  [{}] already existed in the DB. ", dbPropName);
268                     } else {
269                         if (obj.getIndexedProperties().contains(propName)) {
270                             createIndexForProperty(graphMgmt, obj, propName, dbPropName, propertyKey);
271                         } else {
272                             LOGGER.info("No index added for PropertyKey: [{}]", dbPropName);
273                         }
274                     }
275                 }
276             }
277         }
278     }
279
280     private static void createIndexForProperty(final JanusGraphManagement graphMgmt, Introspector obj, String propName,
281             String dbPropName, PropertyKey propertyKey) {
282         JanusGraphIndex indexG = null;
283         if (obj.getUniqueProperties().contains(propName)) {
284             LOGGER.info("Add Unique index for PropertyKey: [{}]", dbPropName);
285             indexG = graphMgmt.buildIndex(dbPropName, Vertex.class).addKey(propertyKey).unique()
286                     .buildCompositeIndex();
287         } else {
288             LOGGER.info("Add index for PropertyKey: [{}]", dbPropName);
289             indexG = graphMgmt.buildIndex(dbPropName, Vertex.class).addKey(propertyKey)
290                     .buildCompositeIndex();
291         }
292         if (indexG != null && dbPropName.equals("aai-uri")) {
293             String aai_uri_lock_enabled =
294                     AAIConfig.get(AAIConstants.AAI_LOCK_URI_ENABLED, "false");
295             LOGGER.info(" Info:: aai_uri_lock_enabled:" + aai_uri_lock_enabled);
296             if ("true".equals(aai_uri_lock_enabled)) {
297                 LOGGER.info("Lock is being set for aai-uri Index.");
298                 graphMgmt.setConsistency(indexG, ConsistencyModifier.LOCK);
299             }
300         } else if (indexG != null && dbPropName.equals("resource-version")) {
301             String aai_rv_lock_enabled =
302                     AAIConfig.get(AAIConstants.AAI_LOCK_RV_ENABLED, "false");
303             LOGGER.info(" Info:: aai_rv_lock_enabled:" + aai_rv_lock_enabled);
304             if ("true".equals(aai_rv_lock_enabled)) {
305                 LOGGER.info("Lock is being set for resource-version Index.");
306                 graphMgmt.setConsistency(indexG, ConsistencyModifier.LOCK);
307             }
308         }
309     }
310
311     /**
312      * Debug method to print current index states.
313      * This can help diagnosing indexes that are stuck in INSTALLED state.
314      * @param graph
315      */
316     public static void printCurrentRelationIndexStates(JanusGraph graph) {
317         JanusGraphManagement graphMgmt = graph.openManagement();
318         EdgeIngestor edgeIngestor = SpringContextAware.getBean(EdgeIngestor.class);
319         try {
320             Set<String> edgeLabels = Optional.ofNullable(edgeIngestor.getAllCurrentRules())
321                         .map(collectValues(EdgeRule::getLabel)).orElseGet(HashSet::new);
322             edgeLabels.stream()
323                 .forEach(label -> {
324                     EdgeLabel relation = graphMgmt.getEdgeLabel(label);
325                     RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
326                     LOGGER.info("Index state of relation index [{}] is [{}]",label, index.getIndexStatus());
327                 });
328         } catch (EdgeRuleNotFoundException e) {
329             e.printStackTrace();
330         }
331     }
332
333     /**
334      * Jointly wait for a set of relation indexes to change status.
335      * Use this after calling {@link JanusGraphManagement#updateIndex(org.janusgraph.core.schema.Index, SchemaAction)}
336      * @param graph the JanusGraph
337      * @param labels the names of the indexes
338      * @param newStatus the new SchemaStatus
339      */
340     private static void awaitRelationIndexStatus(JanusGraph graph, Collection<String> labels, SchemaStatus newStatus) {
341         LOGGER.info("Awaiting index status [{}]", newStatus);;
342         CompletableFuture<RelationIndexStatusReport>[] awaits = labels.stream()
343             .map(label -> 
344                 CompletableFuture.supplyAsync(() -> {
345                     try {
346                         return ManagementSystem
347                             .awaitRelationIndexStatus(graph, label, label)
348                             .status(newStatus)
349                             .call();
350                     } catch (InterruptedException e) {
351                         e.printStackTrace();
352                         return null;
353                     }
354                 }))
355             .toArray(CompletableFuture[]::new);
356         try {
357             CompletableFuture.allOf(awaits).get();
358         } catch (InterruptedException | ExecutionException e) {
359             LOGGER.error("Error while waiting for change in relation index status");
360             e.printStackTrace();
361         }
362         LOGGER.info("Completed waiting for index status [{}]", newStatus);
363     }
364
365     private static void updateRelationIndexes(JanusGraph graph, Collection<String> labels, SchemaAction updateAction) {
366         JanusGraphManagement graphMgmt = graph.openManagement();
367
368         CompletableFuture<IndexJobFuture>[] awaits = labels.stream()
369             .map(label -> 
370                 CompletableFuture.supplyAsync(() -> {
371                     EdgeLabel relation = graphMgmt.getEdgeLabel(label);
372                     RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
373                     LOGGER.info("Updating relation index [{}] status from [{}] to [{}]", relation.name(), index.getIndexStatus(), updateAction);
374                     return graphMgmt.updateIndex(index, updateAction);
375                 }))
376             .toArray(CompletableFuture[]::new);
377         try {
378             CompletableFuture.allOf(awaits).get();
379             LOGGER.info("Completed reindex actions");
380         } catch (InterruptedException | ExecutionException e) {
381             LOGGER.error("Error while waiting for change in relation index status");
382             e.printStackTrace();
383         }
384         graphMgmt.commit();
385     }
386
387     /**
388      * Radical approach to avoiding index update failures.
389      * Indexes can get stuck in INSTALLED state, when there are stale transactions or JanusGraph instances.
390      * This is because a state change needs to be acknowledged by all instances before transitioning.
391      * @param graph
392      * @return
393      */
394     private static void killTransactionsAndInstances(JanusGraph graph) {
395         graph.tx().rollback();
396         final StandardJanusGraph janusGraph = (StandardJanusGraph) graph;
397         janusGraph.getOpenTransactions().stream().forEach(transaction -> {
398                 LOGGER.debug("Closing open transaction [{}] before schema generation", transaction.toString());
399                 transaction.rollback();
400         });
401         
402         final JanusGraphManagement graphMgtForClosing = graph.openManagement();
403
404         Set<String> instances = graphMgtForClosing.getOpenInstances();
405         LOGGER.info("Number of open instances: {}", instances.size());
406         LOGGER.info("Currently open instances: [{}]", instances);
407         instances.stream()
408             // .filter(instance -> !instance.contains("graphadmin"))
409             .filter(instance -> !instance.contains("(current)"))
410             .forEach(instance -> {
411                     LOGGER.debug("Closing open JanusGraph instance [{}] before reindexing procedure", instance);
412                     graphMgtForClosing.forceCloseInstance(instance);
413             });
414         graphMgtForClosing.commit();
415     }
416
417     private static void makeEdgeLabels(JanusGraphManagement graphMgmt, List<String> elementsToReindex, boolean dbNotEmpty) {
418         try {
419             EdgeIngestor edgeIngestor = SpringContextAware.getBean(EdgeIngestor.class);
420
421             Set<String> labels = Optional.ofNullable(edgeIngestor.getAllCurrentRules())
422             .map(collectValues(EdgeRule::getLabel)).orElseGet(HashSet::new);
423
424             labels.forEach(label -> {
425                 if (graphMgmt.containsRelationType(label)) {
426                     LOGGER.debug(" EdgeLabel  [{}] already exists.", label);
427                 } else {
428                     LOGGER.debug("Making EdgeLabel: [{}]", label);
429                     graphMgmt.makeEdgeLabel(label).multiplicity(Multiplicity.MULTI).make();
430                 }
431                 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
432                 RelationTypeIndex relationIndex = graphMgmt.getRelationIndex(relation, label);
433                 if(relationIndex == null) {
434                     LOGGER.debug("Creating edge index for relation: " + label);
435                     graphMgmt.buildEdgeIndex(relation, label, Direction.BOTH, graphMgmt.getPropertyKey("aai-node-type"));
436                     if(dbNotEmpty) {
437                         LOGGER.info("DB not empty. Registering edge [{}] for later reindexing.", label);
438                         elementsToReindex.add(relation.name());
439                     }
440                 } else if(!relationIndex.getIndexStatus().equals(SchemaStatus.ENABLED)) {
441                     LOGGER.info("Relation index was already created but is not in ENABLED status. Current status: [{}]", relationIndex.getIndexStatus());
442                     elementsToReindex.add(label);
443                 } else {
444                     LOGGER.debug("Edge index for label [{}] already exists", label);
445                 };
446             });
447         } catch (EdgeRuleNotFoundException e) {
448             LOGGER.error("Unable to find all rules {}", LogFormatTools.getStackTop(e));
449         }
450     }
451
452     /**
453      * Returns a function collecting all the values in a {@link com.google.common.collect.Multimap}
454      * given a mapping function
455      *
456      * @param f The mapper function
457      * @param <K> The type of key used by the provided {@link com.google.common.collect.Multimap}
458      * @param <V> The type of value used by the provided {@link com.google.common.collect.Multimap}
459      * @param <V0> The type which <V> is mapped to
460      */
461     private static <K, V, V0> Function<Multimap<K, V>, Set<V0>> collectValues(Function<V, V0> f) {
462         return as -> as.values().stream().map(f).collect(Collectors.toSet());
463     }
464
465 }