2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright © 2024 DEUTSCHE TELEKOM AG.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.aai.dbgen;
25 import com.google.common.collect.Multimap;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
34 import java.util.Objects;
35 import java.util.Optional;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ExecutionException;
39 import java.util.function.Function;
40 import java.util.stream.Collectors;
42 import org.apache.tinkerpop.gremlin.structure.Direction;
43 import org.apache.tinkerpop.gremlin.structure.Vertex;
44 import org.janusgraph.core.Cardinality;
45 import org.janusgraph.core.EdgeLabel;
46 import org.janusgraph.core.JanusGraph;
47 import org.janusgraph.core.Multiplicity;
48 import org.janusgraph.core.PropertyKey;
49 import org.janusgraph.core.schema.ConsistencyModifier;
50 import org.janusgraph.core.schema.JanusGraphIndex;
51 import org.janusgraph.core.schema.JanusGraphManagement;
52 import org.janusgraph.core.schema.JanusGraphManagement.IndexJobFuture;
53 import org.janusgraph.core.schema.RelationTypeIndex;
54 import org.janusgraph.core.schema.SchemaAction;
55 import org.janusgraph.core.schema.SchemaStatus;
56 import org.janusgraph.graphdb.database.StandardJanusGraph;
57 import org.janusgraph.graphdb.database.management.ManagementSystem;
58 import org.janusgraph.graphdb.database.management.RelationIndexStatusReport;
59 import org.onap.aai.config.SpringContextAware;
60 import org.onap.aai.edges.EdgeIngestor;
61 import org.onap.aai.edges.EdgeRule;
62 import org.onap.aai.edges.exceptions.EdgeRuleNotFoundException;
63 import org.onap.aai.introspection.Introspector;
64 import org.onap.aai.introspection.LoaderUtil;
65 import org.onap.aai.logging.LogFormatTools;
66 import org.onap.aai.schema.enums.PropertyMetadata;
67 import org.onap.aai.util.AAIConfig;
68 import org.onap.aai.util.AAIConstants;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 public class SchemaGenerator {
74 private static final Logger LOGGER = LoggerFactory.getLogger(SchemaGenerator.class);
75 private static int indexRecoveryRetryCounter = 0;
77 private SchemaGenerator() {
81 * Load schema into JanusGraph.
86 public static List<String> loadSchemaIntoJanusGraph(final JanusGraph graph, String backend, boolean dbNotEmpty) {
87 JanusGraphManagement graphMgmt = graph.openManagement();
88 final List<String> elementsToReindex = new ArrayList<>();
92 } catch (Exception ex) {
93 LOGGER.error(" ERROR - Could not run AAIConfig.init(). {}", LogFormatTools.getStackTop(ex));
97 // NOTE - JanusGraph 0.5.3 doesn't keep a list of legal node Labels.
98 // They are only used when a vertex is actually being created.
99 // JanusGraph 1.1 will keep track (we think).
101 // Use EdgeRules to make sure edgeLabels are defined in the db. NOTE:
102 // the multiplicty used here is
103 // always "MULTI". This is not the same as our internal "Many2Many",
104 // "One2One", "One2Many" or "Many2One"
105 // We use the same edge-label for edges between many different types of
106 // nodes and our internal
107 // multiplicty definitions depends on which two types of nodes are being
110 final Map<String, Introspector> objs = LoaderUtil.getLatestVersion().getAllObjects();
111 final Map<String, PropertyKey> seenProps = new HashMap<>();
113 for (Introspector obj : objs.values()) {
114 createSchemaForObject(graphMgmt, seenProps, obj);
117 makeEdgeLabels(graphMgmt, elementsToReindex, dbNotEmpty);
119 LOGGER.info("-- About to call graphMgmt commit");
120 if (backend != null) {
121 LOGGER.info("Successfully loaded the schema to {}", backend);
125 return elementsToReindex;
128 public static void reindexEdgeIndexes(final JanusGraph graph, Collection<String> edgeLabels) {
129 graph.tx().rollback();
130 if(edgeLabels.isEmpty()) {
131 LOGGER.info("Nothing to reindex.");
135 ensureValidIndexState(graph, edgeLabels);
137 awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.REGISTERED);
139 LOGGER.info("Attempting to transition indexes in REGISTERED state to ENABLED");
140 updateRelationIndexes(graph, edgeLabels, SchemaAction.REINDEX);
142 ensureEnabledIndexState(graph, edgeLabels);
146 * Indices need to be in status REGISTERED or ENABLED to allow reindexing.
150 private static void ensureEnabledIndexState(final JanusGraph graph, Collection<String> edgeLabels) {
151 JanusGraphManagement graphMgmt = graph.openManagement();
152 List<String> registeredIndexes = new ArrayList<>();
153 for(String label: edgeLabels) {
154 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
155 RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
156 SchemaStatus indexStatus = index.getIndexStatus();
157 if(indexStatus.equals(SchemaStatus.REGISTERED)) {
158 LOGGER.info("Detected relation index [{}] that is not yet in ENABLED state", relation.name());
159 registeredIndexes.add(label);
164 if(indexRecoveryRetryCounter <= 8 && !registeredIndexes.isEmpty()) {
165 indexRecoveryRetryCounter++;
166 LOGGER.info("[{}] indexes not yet in ENABLED state", registeredIndexes.size());
167 awaitRelationIndexStatus(graph, registeredIndexes,SchemaStatus.ENABLED);
169 ensureEnabledIndexState(graph, edgeLabels); // recursively call to make sure there is no invalid state
171 LOGGER.info("All indexes are in ENABLED state, exiting.");
177 * Indices need to be in status REGISTERED or ENABLED to allow reindexing.
181 private static void ensureValidIndexState(final JanusGraph graph, Collection<String> edgeLabels) {
182 JanusGraphManagement graphMgmt = graph.openManagement();
183 List<String> installedIndexes = new ArrayList<>();
184 List<String> disabledIndexes = new ArrayList<>();
185 for(String label: edgeLabels) {
186 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
187 RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
188 SchemaStatus indexStatus = index.getIndexStatus();
189 if(indexStatus.equals(SchemaStatus.INSTALLED)) {
190 LOGGER.info("Detected relation index [{}] with invalid status [{}]", relation.name(), indexStatus);
191 installedIndexes.add(label);
192 } else if(indexStatus.equals(SchemaStatus.DISABLED)) {
193 LOGGER.info("Detected relation index [{}] with invalid status [{}]", relation.name(), indexStatus);
194 disabledIndexes.add(label);
199 if(indexRecoveryRetryCounter <= 300 && (!installedIndexes.isEmpty() || !disabledIndexes.isEmpty())) {
200 indexRecoveryRetryCounter++;
201 if(!installedIndexes.isEmpty()) {
202 LOGGER.info("Attempting to transition indexes in INSTALLED state to REGISTERED");
203 updateRelationIndexes(graph, installedIndexes, SchemaAction.REGISTER_INDEX);
204 awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.REGISTERED);
206 if(!disabledIndexes.isEmpty()) {
207 LOGGER.info("Attempting to transition indexes in DISABLED state to ENABLED");
208 updateRelationIndexes(graph, disabledIndexes, SchemaAction.ENABLE_INDEX);
209 awaitRelationIndexStatus(graph, edgeLabels,SchemaStatus.ENABLED);
211 ensureValidIndexState(graph, edgeLabels); // recursively call to make sure there is no invalid state
217 private static void createSchemaForObject(final JanusGraphManagement graphMgmt, Map<String, PropertyKey> seenProps,
219 for (String propName : obj.getProperties()) {
220 String dbPropName = propName;
221 Optional<String> alias = obj.getPropertyMetadata(propName, PropertyMetadata.DB_ALIAS);
222 if (alias.isPresent()) {
223 dbPropName = alias.get();
225 if (graphMgmt.containsRelationType(dbPropName)) {
226 LOGGER.debug(" PropertyKey [{}] already existed in the DB. ", dbPropName);
228 Class<?> type = obj.getClass(propName);
229 Cardinality cardinality = Cardinality.SINGLE;
230 boolean process = false;
231 if (obj.isListType(propName) && obj.isSimpleGenericType(propName)) {
232 cardinality = Cardinality.SET;
233 type = obj.getGenericTypeClass(propName);
235 } else if (obj.isSimpleType(propName)) {
241 LOGGER.info("Creating PropertyKey: [{}], [{}], [{}]", dbPropName, type.getSimpleName(),
243 PropertyKey propertyKey;
244 if (!seenProps.containsKey(dbPropName)) {
245 propertyKey = graphMgmt.makePropertyKey(dbPropName).dataType(type).cardinality(cardinality)
247 if (dbPropName.equals("aai-uri")) {
248 String aai_uri_lock_enabled = AAIConfig.get(AAIConstants.AAI_LOCK_URI_ENABLED, "false");
249 LOGGER.info(" Info: aai_uri_lock_enabled:" + aai_uri_lock_enabled);
250 if ("true".equals(aai_uri_lock_enabled)) {
251 LOGGER.info(" Lock is being set for aai-uri Property.");
252 graphMgmt.setConsistency(propertyKey, ConsistencyModifier.LOCK);
254 } else if (dbPropName.equals("resource-version")) {
255 String aai_rv_lock_enabled = AAIConfig.get(AAIConstants.AAI_LOCK_RV_ENABLED, "false");
256 LOGGER.info(" Info: aai_rv_lock_enabled:" + aai_rv_lock_enabled);
257 if ("true".equals(aai_rv_lock_enabled)) {
258 LOGGER.info(" Lock is being set for resource-version Property.");
259 graphMgmt.setConsistency(propertyKey, ConsistencyModifier.LOCK);
262 seenProps.put(dbPropName, propertyKey);
264 propertyKey = seenProps.get(dbPropName);
266 if (graphMgmt.containsGraphIndex(dbPropName)) {
267 LOGGER.debug(" Index [{}] already existed in the DB. ", dbPropName);
269 if (obj.getIndexedProperties().contains(propName)) {
270 createIndexForProperty(graphMgmt, obj, propName, dbPropName, propertyKey);
272 LOGGER.info("No index added for PropertyKey: [{}]", dbPropName);
280 private static void createIndexForProperty(final JanusGraphManagement graphMgmt, Introspector obj, String propName,
281 String dbPropName, PropertyKey propertyKey) {
282 JanusGraphIndex indexG = null;
283 if (obj.getUniqueProperties().contains(propName)) {
284 LOGGER.info("Add Unique index for PropertyKey: [{}]", dbPropName);
285 indexG = graphMgmt.buildIndex(dbPropName, Vertex.class).addKey(propertyKey).unique()
286 .buildCompositeIndex();
288 LOGGER.info("Add index for PropertyKey: [{}]", dbPropName);
289 indexG = graphMgmt.buildIndex(dbPropName, Vertex.class).addKey(propertyKey)
290 .buildCompositeIndex();
292 if (indexG != null && dbPropName.equals("aai-uri")) {
293 String aai_uri_lock_enabled =
294 AAIConfig.get(AAIConstants.AAI_LOCK_URI_ENABLED, "false");
295 LOGGER.info(" Info:: aai_uri_lock_enabled:" + aai_uri_lock_enabled);
296 if ("true".equals(aai_uri_lock_enabled)) {
297 LOGGER.info("Lock is being set for aai-uri Index.");
298 graphMgmt.setConsistency(indexG, ConsistencyModifier.LOCK);
300 } else if (indexG != null && dbPropName.equals("resource-version")) {
301 String aai_rv_lock_enabled =
302 AAIConfig.get(AAIConstants.AAI_LOCK_RV_ENABLED, "false");
303 LOGGER.info(" Info:: aai_rv_lock_enabled:" + aai_rv_lock_enabled);
304 if ("true".equals(aai_rv_lock_enabled)) {
305 LOGGER.info("Lock is being set for resource-version Index.");
306 graphMgmt.setConsistency(indexG, ConsistencyModifier.LOCK);
312 * Debug method to print current index states.
313 * This can help diagnosing indexes that are stuck in INSTALLED state.
316 public static void printCurrentRelationIndexStates(JanusGraph graph) {
317 JanusGraphManagement graphMgmt = graph.openManagement();
318 EdgeIngestor edgeIngestor = SpringContextAware.getBean(EdgeIngestor.class);
320 Set<String> edgeLabels = Optional.ofNullable(edgeIngestor.getAllCurrentRules())
321 .map(collectValues(EdgeRule::getLabel)).orElseGet(HashSet::new);
324 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
325 RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
326 LOGGER.info("Index state of relation index [{}] is [{}]",label, index.getIndexStatus());
328 } catch (EdgeRuleNotFoundException e) {
334 * Jointly wait for a set of relation indexes to change status.
335 * Use this after calling {@link JanusGraphManagement#updateIndex(org.janusgraph.core.schema.Index, SchemaAction)}
336 * @param graph the JanusGraph
337 * @param labels the names of the indexes
338 * @param newStatus the new SchemaStatus
340 private static void awaitRelationIndexStatus(JanusGraph graph, Collection<String> labels, SchemaStatus newStatus) {
341 LOGGER.info("Awaiting index status [{}]", newStatus);;
342 CompletableFuture<RelationIndexStatusReport>[] awaits = labels.stream()
344 CompletableFuture.supplyAsync(() -> {
346 return ManagementSystem
347 .awaitRelationIndexStatus(graph, label, label)
350 } catch (InterruptedException e) {
355 .toArray(CompletableFuture[]::new);
357 CompletableFuture.allOf(awaits).get();
358 } catch (InterruptedException | ExecutionException e) {
359 LOGGER.error("Error while waiting for change in relation index status");
362 LOGGER.info("Completed waiting for index status [{}]", newStatus);
365 private static void updateRelationIndexes(JanusGraph graph, Collection<String> labels, SchemaAction updateAction) {
366 JanusGraphManagement graphMgmt = graph.openManagement();
368 CompletableFuture<IndexJobFuture>[] awaits = labels.stream()
370 CompletableFuture.supplyAsync(() -> {
371 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
372 RelationTypeIndex index = graphMgmt.getRelationIndex(relation, label);
373 LOGGER.info("Updating relation index [{}] status from [{}] to [{}]", relation.name(), index.getIndexStatus(), updateAction);
374 return graphMgmt.updateIndex(index, updateAction);
376 .toArray(CompletableFuture[]::new);
378 CompletableFuture.allOf(awaits).get();
379 LOGGER.info("Completed reindex actions");
380 } catch (InterruptedException | ExecutionException e) {
381 LOGGER.error("Error while waiting for change in relation index status");
388 * Radical approach to avoiding index update failures.
389 * Indexes can get stuck in INSTALLED state, when there are stale transactions or JanusGraph instances.
390 * This is because a state change needs to be acknowledged by all instances before transitioning.
394 private static void killTransactionsAndInstances(JanusGraph graph) {
395 graph.tx().rollback();
396 final StandardJanusGraph janusGraph = (StandardJanusGraph) graph;
397 janusGraph.getOpenTransactions().stream().forEach(transaction -> {
398 LOGGER.debug("Closing open transaction [{}] before schema generation", transaction.toString());
399 transaction.rollback();
402 final JanusGraphManagement graphMgtForClosing = graph.openManagement();
404 Set<String> instances = graphMgtForClosing.getOpenInstances();
405 LOGGER.info("Number of open instances: {}", instances.size());
406 LOGGER.info("Currently open instances: [{}]", instances);
408 // .filter(instance -> !instance.contains("graphadmin"))
409 .filter(instance -> !instance.contains("(current)"))
410 .forEach(instance -> {
411 LOGGER.debug("Closing open JanusGraph instance [{}] before reindexing procedure", instance);
412 graphMgtForClosing.forceCloseInstance(instance);
414 graphMgtForClosing.commit();
417 private static void makeEdgeLabels(JanusGraphManagement graphMgmt, List<String> elementsToReindex, boolean dbNotEmpty) {
419 EdgeIngestor edgeIngestor = SpringContextAware.getBean(EdgeIngestor.class);
421 Set<String> labels = Optional.ofNullable(edgeIngestor.getAllCurrentRules())
422 .map(collectValues(EdgeRule::getLabel)).orElseGet(HashSet::new);
424 labels.forEach(label -> {
425 if (graphMgmt.containsRelationType(label)) {
426 LOGGER.debug(" EdgeLabel [{}] already exists.", label);
428 LOGGER.debug("Making EdgeLabel: [{}]", label);
429 graphMgmt.makeEdgeLabel(label).multiplicity(Multiplicity.MULTI).make();
431 EdgeLabel relation = graphMgmt.getEdgeLabel(label);
432 RelationTypeIndex relationIndex = graphMgmt.getRelationIndex(relation, label);
433 if(relationIndex == null) {
434 LOGGER.debug("Creating edge index for relation: " + label);
435 graphMgmt.buildEdgeIndex(relation, label, Direction.BOTH, graphMgmt.getPropertyKey("aai-node-type"));
437 LOGGER.info("DB not empty. Registering edge [{}] for later reindexing.", label);
438 elementsToReindex.add(relation.name());
440 } else if(!relationIndex.getIndexStatus().equals(SchemaStatus.ENABLED)) {
441 LOGGER.info("Relation index was already created but is not in ENABLED status. Current status: [{}]", relationIndex.getIndexStatus());
442 elementsToReindex.add(label);
444 LOGGER.debug("Edge index for label [{}] already exists", label);
447 } catch (EdgeRuleNotFoundException e) {
448 LOGGER.error("Unable to find all rules {}", LogFormatTools.getStackTop(e));
453 * Returns a function collecting all the values in a {@link com.google.common.collect.Multimap}
454 * given a mapping function
456 * @param f The mapper function
457 * @param <K> The type of key used by the provided {@link com.google.common.collect.Multimap}
458 * @param <V> The type of value used by the provided {@link com.google.common.collect.Multimap}
459 * @param <V0> The type which <V> is mapped to
461 private static <K, V, V0> Function<Multimap<K, V>, Set<V0>> collectValues(Function<V, V0> f) {
462 return as -> as.values().stream().map(f).collect(Collectors.toSet());