2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 * Modifications copyright (c) 2018 Nokia
20 * ================================================================================
22 package org.openecomp.sdc.be.dao.cassandra.schema;
24 import com.datastax.driver.core.AbstractTableMetadata;
25 import com.datastax.driver.core.Cluster;
26 import com.datastax.driver.core.DataType;
27 import com.datastax.driver.core.IndexMetadata;
28 import com.datastax.driver.core.KeyspaceMetadata;
29 import com.datastax.driver.core.Session;
30 import com.datastax.driver.core.schemabuilder.Alter;
31 import com.datastax.driver.core.schemabuilder.Create;
32 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
33 import com.datastax.driver.core.schemabuilder.SchemaStatement;
34 import com.datastax.oss.driver.shaded.guava.common.base.Function;
35 import java.util.ArrayList;
36 import java.util.HashMap;
37 import java.util.List;
39 import java.util.Map.Entry;
40 import java.util.Optional;
41 import java.util.function.Supplier;
42 import java.util.stream.Collectors;
43 import lombok.AllArgsConstructor;
45 import org.apache.commons.lang3.tuple.ImmutablePair;
46 import org.openecomp.sdc.be.config.Configuration;
47 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
48 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
49 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
50 import org.openecomp.sdc.common.log.wrappers.Logger;
52 public class SdcSchemaBuilder {
55 * creat key space statment for SimpleStrategy
57 private static final String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
59 * creat key space statment for NetworkTopologyStrategy
61 private static final String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
62 private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
63 private SdcSchemaUtils sdcSchemaUtils;
64 private Supplier<Configuration.CassandrConfig> cassandraConfigSupplier;
65 public SdcSchemaBuilder(SdcSchemaUtils sdcSchemaUtils, Supplier<Configuration.CassandrConfig> cassandraConfigSupplier) {
66 this.sdcSchemaUtils = sdcSchemaUtils;
67 this.cassandraConfigSupplier = cassandraConfigSupplier;
70 //TODO remove after 1707_OS migration
71 private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata,
72 Map<String, List<ITableDescription>> schemeData) {
73 if (cassndraMetadata.containsKey("attaudit")) {
74 List<ITableDescription> list = new ArrayList<>();
75 list.add(new OldExternalApiEventTableDesc());
76 schemeData.put("attaudit", list);
81 * the method prcess the metadata retrieved from the cassandra for the creation of a map conting the names of keyspaces tabls and indexes already
82 * defined in the cassandra keyspacename -> tablename -> list of indexes info
84 * @param keyspacesMetadata cassndra mmetadata
85 * @return a map of maps of lists holding parsed info
87 private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
88 return keyspacesMetadata.stream().collect(Collectors.toMap(KeyspaceMetadata::getName,
89 keyspaceMetadata -> keyspaceMetadata.getTables().stream().collect(Collectors.toMap(AbstractTableMetadata::getName,
90 tableMetadata -> tableMetadata.getIndexes().stream().map(IndexMetadata::getName).collect(Collectors.toList())))));
93 private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(List<KeyspaceMetadata> keyspacesMetadata) {
94 return keyspacesMetadata.stream().collect(Collectors.toMap(KeyspaceMetadata::getName,
95 keyspaceMetadata -> keyspaceMetadata.getTables().stream().collect(Collectors.toMap(AbstractTableMetadata::getName,
96 tableMetadata -> tableMetadata.getColumns().stream().map(columnMetadata -> columnMetadata.getName().toLowerCase())
97 .collect(Collectors.toList())))));
101 * the method builds an index name according to a defined logic
105 * @param table: table name
106 * @param column: column name
107 * @return string name of the index
109 private static String createIndexName(String table, String column) {
110 return table + "_" + column + "_idx";
114 * the method creats all the tables and indexes thet do not already exist
116 * @param iTableDescriptions: a list of table description we want to create
117 * @param keyspaceMetadata: the current tables that exist in the cassandra under this keyspace
118 * @param session: the session object used for the execution of the query.
119 * @param existingTablesMetadata the current tables columns that exist in the cassandra under this keyspace
121 private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadata, Session session,
122 Map<String, List<String>> existingTablesMetadata) {
123 for (ITableDescription tableDescription : iTableDescriptions) {
124 String tableName = tableDescription.getTableName().toLowerCase();
125 Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
126 log.info("creating tables:{}.", tableName);
127 if (keyspaceMetadata == null || !keyspaceMetadata.containsKey(tableName)) {
128 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(), tableDescription.getTableName());
129 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
130 create.addPartitionKey(key.getLeft(), key.getRight());
132 if (tableDescription.clusteringKeys() != null) {
133 for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
134 create.addClusteringColumn(key.getLeft(), key.getRight());
137 final Function<Entry<String, ?>, Boolean> notPrimaryKeyFilter = (Entry<String, ?> entry) -> {
141 return tableDescription.primaryKeys().stream().noneMatch(primaryKeyPair -> primaryKeyPair.getLeft().equals(entry.getKey()));
143 columnDescription.entrySet().stream()
144 .filter(notPrimaryKeyFilter::apply)
145 .forEach(entry -> create.addColumn(entry.getKey(), entry.getValue().getLeft()));
146 log.trace("exacuting :{}", create);
147 session.execute(create);
148 log.info("table:{} created successfully.", tableName);
150 log.info("table:{} already exists, skipping.", tableName);
151 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
153 log.info("keyspacemetadata:{}", keyspaceMetadata);
154 List<String> indexNames = (keyspaceMetadata != null && keyspaceMetadata.get(tableName) != null ? keyspaceMetadata.get(tableName)
155 : new ArrayList<>());
156 log.info("table:{} creating indexes.", tableName);
157 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> description : columnDescription.entrySet()) {
158 String indexName = createIndexName(tableName, description.getKey()).toLowerCase();
159 if (description.getValue().getRight()) {
160 if (!indexNames.contains(indexName)) {
161 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName).onTable(tableDescription.getKeyspace(), tableName)
162 .andColumn(description.getKey());
163 log.info("executing :{}", creatIndex);
164 session.execute(creatIndex);
165 log.info("index:{} created successfully.", indexName);
167 log.info("index:{} already exists, skipping.", indexName);
175 * check if there are new columns that were added to definition but don't exist in DB
178 * @param existingTablesMetadata
179 * @param tableDescription
181 * @param columnDescription
183 private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata, ITableDescription tableDescription,
184 String tableName, Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
185 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
186 //add column to casandra if was added to table definition
187 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
188 String columnName = column.getKey();
189 if (!definedTableColumns.contains(columnName.toLowerCase())) {
190 log.info("Adding new column {} to the table {}", columnName, tableName);
191 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(), tableDescription.getTableName());
192 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
193 log.trace("executing :{}", addColumn);
194 session.execute(addColumn);
199 private static boolean createKeyspaceIfNotExists(String keyspace, Session session,
200 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList) {
201 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream()
202 .filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
203 if (keyspaceConfig.isPresent()) {
204 return createKeyspaceWhenConfigExists(keyspace, session, keyspaceConfig.get());
206 log.info("keyspace:{} not present in configuration, no info on replications is available. Operation failed.", keyspace);
210 private static boolean createKeyspaceWhenConfigExists(String keyspace, Session session,
211 Configuration.CassandrConfig.KeyspaceConfig keyspaceConfig) {
212 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceConfig);
213 if (createKeyspaceQuery != null) {
214 log.trace("executing: {}", createKeyspaceQuery);
215 session.execute(createKeyspaceQuery);
216 log.info("keyspace:{} created.", keyspace);
223 * the method retries the schem info from the enums describing the tables
225 * @return a map of keyspaces to there table info
227 private static Map<String, List<ITableDescription>> getSchemeData() {
228 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
229 Table[] tables = Table.values();
230 for (Table table : tables) {
231 String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
232 List<ITableDescription> list = tablesByKeyspace.get(keyspace);
234 list = new ArrayList<>();
236 list.add(table.getTableDescription());
237 tablesByKeyspace.put(keyspace, list);
239 return tablesByKeyspace;
243 * the methoed creates the query string for the given keyspace the methoed valides the given data according the the requirments of the replication
244 * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
245 * <keyspaceName></keyspaceName> WITH replication =
246 * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
247 * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
249 * @param keyspace name of the keyspace we want to create
250 * @param keyspaceInfo configuration info regurding the replication of the keyspace
251 * @return a querey string for the creation of the keyspace
253 private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
255 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
256 query = createNetworkTopologyStrategy(keyspaceInfo, keyspace);
257 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
258 query = createSimpleStrategyQuery(keyspaceInfo, keyspace);
260 log.error("the supplied replication Strategy is invalid; expected {}/{}, received:{}",
261 ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName(), ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName(),
262 keyspaceInfo.getReplicationStrategy());
267 private static String createNetworkTopologyStrategy(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
269 List<String> dcList = keyspaceInfo.getReplicationInfo();
270 if (dcList.size() % 2 != 0) {
271 log.error("the supplied replication info is invalid; expected dc1,2,dc2,2 etc, received:{}", dcList);
273 StringBuilder sb = new StringBuilder();
274 for (int i = 0; i < dcList.size(); i = i + 2) {
275 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
276 if (i + 2 < dcList.size()) {
280 query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
285 private static String createSimpleStrategyQuery(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
287 List<String> dcList = keyspaceInfo.getReplicationInfo();
288 if (dcList.size() != 1) {
289 log.error("the supplied replication info is invalid; expected <number>, received:{}", dcList);
291 query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, "'replication_factor'" + " : " + dcList.get(0));
297 * the method creates all keyspaces, tables and indexes in case they do not already exist. the method can be run multiple times. the method uses
298 * the internal enums and external configuration for its operation *
300 * @return true if the create operation was successful
302 public boolean createSchema() {
303 try (Cluster cluster = sdcSchemaUtils.createCluster(); Session session = cluster.connect()) {
304 log.info("creating Schema for Cassandra.");
305 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
306 if (keyspacesMetadateFromCassandra == null) {
307 log.debug("filed to retrieve a list of keyspaces from cassandra");
310 log.debug("retrieved Cassandra metadata.");
311 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
312 Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
313 Map<String, List<ITableDescription>> schemeData = getSchemeData();
314 //TODO remove after 1707_OS migration
315 handle1707OSMigration(cassndraMetadata, schemeData);
316 log.info("creating Keyspaces.");
317 for (Map.Entry<String, List<ITableDescription>> keyspace : schemeData.entrySet()) {
318 if (!createKeyspace(keyspace.getKey(), cassndraMetadata, session)) {
321 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace.getKey());
322 createTables(keyspace.getValue(), keyspaceMetadate, session, metadataTablesStructure.get(keyspace.getKey()));
325 } catch (Exception e) {
326 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
331 public boolean deleteSchema() {
333 try (Cluster cluster = sdcSchemaUtils.createCluster(); Session session = cluster.connect()) {
334 log.info("delete Data from Cassandra.");
335 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
336 if (keyspacesMetadateFromCassandra == null) {
337 log.debug("filed to retrieve a list of keyspaces from cassandra");
340 log.debug("retrieved Cassandra metadata.");
341 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
342 log.info("Cassandra Metadata: {}", cassndraMetadata);
343 cassndraMetadata.forEach((k, v) -> {
344 if (AuditingTypesConstants.janusGraph_KEYSPACE.equals(k)) {
345 // session.execute("")
346 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
347 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
350 System.out.println(cassndraMetadata);
352 } catch (Exception e) {
353 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
359 * the method create the keyspace in case it does not already exists the method uses configurtion to select the needed replication strategy
361 * @param keyspace: name of the keyspace we want to create
362 * @param cassndraMetadata: cassndra metadata
363 * @param session: the session object used for the execution of the query.
364 * @return true in case the operation was successful
366 private boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
367 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = cassandraConfigSupplier.get().getKeySpaces();
368 log.info("creating keyspace:{}.", keyspace);
369 if (!cassndraMetadata.keySet().contains(keyspace)) {
370 return createKeyspaceIfNotExists(keyspace, session, keyspaceConfigList);
372 log.info("keyspace:{} already exists, skipping.", keyspace);
377 public enum ReplicationStrategy {
378 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
380 private final String strategyName;