2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 * Modifications copyright (c) 2018 Nokia
20 * ================================================================================
23 package org.openecomp.sdc.be.dao.cassandra.schema;
25 import com.datastax.driver.core.*;
26 import com.datastax.driver.core.schemabuilder.Alter;
27 import com.datastax.driver.core.schemabuilder.Create;
28 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
29 import com.datastax.driver.core.schemabuilder.SchemaStatement;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.Configuration;
32 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
38 import java.util.stream.Collectors;
39 import java.util.function.Supplier;
41 public class SdcSchemaBuilder {
44 * creat key space statment for SimpleStrategy
46 private static final String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
48 * creat key space statment for NetworkTopologyStrategy
50 private static final String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
52 private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
54 private SdcSchemaUtils sdcSchemaUtils;
55 private Supplier<Configuration.CassandrConfig> cassandraConfigSupplier;
57 public SdcSchemaBuilder(SdcSchemaUtils sdcSchemaUtils, Supplier<Configuration.CassandrConfig> cassandraConfigSupplier) {
58 this.sdcSchemaUtils = sdcSchemaUtils;
59 this.cassandraConfigSupplier = cassandraConfigSupplier;
62 //TODO remove after 1707_OS migration
63 private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata, Map<String, List<ITableDescription>> schemeData){
64 if(cassndraMetadata.containsKey("attaudit")){
65 List<ITableDescription> list = new ArrayList<>();
66 list.add(new OldExternalApiEventTableDesc());
67 schemeData.put("attaudit", list);
72 * the method creates all keyspaces, tables and indexes in case they do not
73 * already exist. the method can be run multiple times. the method uses the
74 * internal enums and external configuration for its operation *
75 * @return true if the create operation was successful
77 public boolean createSchema() {
79 try(Cluster cluster = sdcSchemaUtils.createCluster();
80 Session session = cluster.connect()) {
81 log.info("creating Schema for Cassandra.");
82 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
83 if (keyspacesMetadateFromCassandra == null) {
84 log.debug("filed to retrieve a list of keyspaces from cassandra");
87 log.debug("retrieved Cassandra metadata.");
88 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
89 Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
90 Map<String, List<ITableDescription>> schemeData = getSchemeData();
91 //TODO remove after 1707_OS migration
92 handle1707OSMigration(cassndraMetadata, schemeData);
93 log.info("creating Keyspaces.");
94 for (Map.Entry<String, List<ITableDescription>> keyspace : schemeData.entrySet()) {
95 if (!createKeyspace(keyspace.getKey(), cassndraMetadata, session)) {
98 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace.getKey());
99 createTables(keyspace.getValue(), keyspaceMetadate, session,metadataTablesStructure.get(keyspace.getKey()));
102 } catch (Exception e) {
103 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
109 public boolean deleteSchema() {
111 try(Cluster cluster = sdcSchemaUtils.createCluster();
112 Session session = cluster.connect()) {
113 log.info("delete Data from Cassandra.");
114 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
115 if (keyspacesMetadateFromCassandra == null) {
116 log.debug("filed to retrieve a list of keyspaces from cassandra");
119 log.debug("retrieved Cassandra metadata.");
120 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
121 log.info("Cassandra Metadata: {}" ,cassndraMetadata);
122 cassndraMetadata.forEach((k, v) -> {
123 if (AuditingTypesConstants.janusGraph_KEYSPACE.equals(k)) {
124 // session.execute("")
125 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
127 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
132 System.out.println(cassndraMetadata);
134 } catch (Exception e) {
135 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
141 * the method prcess the metadata retrieved from the cassandra for the
142 * creation of a map conting the names of keyspaces tabls and indexes
143 * already defined in the cassandra keyspacename -> tablename -> list of
146 * @param keyspacesMetadata
148 * @return a map of maps of lists holding parsed info
150 private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
151 return keyspacesMetadata.stream()
152 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
154 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getIndexes()
156 .map(IndexMetadata::getName)
157 .collect(Collectors.toList())))));
160 private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(
161 List<KeyspaceMetadata> keyspacesMetadata) {
162 return keyspacesMetadata.stream()
163 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
165 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getColumns()
167 .map(columnMetadata -> columnMetadata.getName().toLowerCase())
168 .collect(Collectors.toList())))));
172 * the method builds an index name according to a defined logic
176 * @param table: table name
177 * @param column: column name
178 * @return string name of the index
180 private static String createIndexName(String table, String column) {
181 return table + "_" + column + "_idx";
185 * the method creats all the tables and indexes thet do not already exist
187 * @param iTableDescriptions: a list of table description we want to create
188 * @param keyspaceMetadate: the current tables that exist in the cassandra under this keyspace
189 * @param session: the session object used for the execution of the query.
190 * @param existingTablesMetadata
191 * the current tables columns that exist in the cassandra under this
194 private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadate, Session session,
195 Map<String, List<String>> existingTablesMetadata) {
196 for (ITableDescription tableDescription : iTableDescriptions) {
197 String tableName = tableDescription.getTableName().toLowerCase();
198 Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
199 log.info("creating tables:{}.", tableName);
200 if (keyspaceMetadate == null || !keyspaceMetadate.keySet().contains(tableName)) {
201 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(),tableDescription.getTableName());
202 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
203 create.addPartitionKey(key.getLeft(), key.getRight());
205 if (tableDescription.clusteringKeys() != null) {
206 for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
207 create.addClusteringColumn(key.getLeft(), key.getRight());
211 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> entry : columnDescription.entrySet()) {
212 create.addColumn(entry.getKey(), entry.getValue().getLeft());
215 log.trace("exacuting :{}", create);
216 session.execute(create);
217 log.info("table:{} created succsesfully.", tableName);
219 log.info("table:{} already exists skiping.", tableName);
220 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
222 log.info("keyspacemetdata{}",keyspaceMetadate);
223 List<String> indexNames = (keyspaceMetadate != null && keyspaceMetadate.get(tableName) != null ? keyspaceMetadate.get(tableName) : new ArrayList<>());
224 log.info("table:{} creating indexes.", tableName);
225 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> description : columnDescription.entrySet()) {
226 String indexName = createIndexName(tableName, description.getKey()).toLowerCase();
227 if (description.getValue().getRight()) {
228 if (!indexNames.contains(indexName)) {
229 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName)
230 .onTable(tableDescription.getKeyspace(), tableName).andColumn(description.getKey());
231 log.info("executing :{}", creatIndex);
232 session.execute(creatIndex);
233 log.info("index:{} created succsesfully.", indexName);
235 log.info("index:{} already exists skiping.", indexName);
245 * check if there are new columns that were added to definition but don't exist in DB
247 * @param existingTablesMetadata
248 * @param tableDescription
250 * @param columnDescription
252 private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata,
253 ITableDescription tableDescription, String tableName,
254 Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
255 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
256 //add column to casandra if was added to table definition
257 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
258 String columnName = column.getKey();
259 if (!definedTableColumns.contains(columnName.toLowerCase())){
260 log.info("Adding new column {} to the table {}", columnName,tableName);
261 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(),tableDescription.getTableName());
262 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
263 log.trace("exacuting :{}", addColumn);
264 session.execute(addColumn);
270 * the method create the keyspace in case it does not already exists the
271 * method uses configurtion to select the needed replication strategy
273 * @param keyspace: name of the keyspace we want to create
274 * @param cassndraMetadata: cassndra metadata
275 * @param session: the session object used for the execution of the query.
276 * @return true in case the operation was successful
278 private boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
279 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = cassandraConfigSupplier.get().getKeySpaces();
280 log.info("creating keyspace:{}.", keyspace);
281 if (!cassndraMetadata.keySet().contains(keyspace)) {
282 return createKeyspaceIfNotExists(keyspace, session, keyspaceConfigList);
284 log.info("keyspace:{} already exists skipping.", keyspace);
288 private static boolean createKeyspaceIfNotExists(String keyspace, Session session, List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList) {
289 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream().filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
290 if (keyspaceConfig.isPresent()) {
291 return createKeyspaceWhenConfigExists(keyspace, session, keyspaceConfig.get());
293 log.info("keyspace:{} not present in configuration, no info on replications is available. operation failed.", keyspace);
297 private static boolean createKeyspaceWhenConfigExists(String keyspace, Session session, Configuration.CassandrConfig.KeyspaceConfig keyspaceConfig) {
298 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceConfig);
299 if (createKeyspaceQuery != null) {
300 log.trace("exacuting: {}", createKeyspaceQuery);
301 session.execute(createKeyspaceQuery);
302 log.info("keyspace:{} created.", keyspace);
309 * the method retries the schem info from the enums describing the tables
311 * @return a map of keyspaces to there table info
313 private static Map<String, List<ITableDescription>> getSchemeData() {
314 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
315 Table[] tables = Table.values();
316 for (Table table : tables) {
317 String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
318 List<ITableDescription> list = tablesByKeyspace.get(keyspace);
320 list = new ArrayList<>();
322 list.add(table.getTableDescription());
323 tablesByKeyspace.put(keyspace, list);
325 return tablesByKeyspace;
329 * the methoed creates the query string for the given keyspace the methoed
330 * valides the given data according the the requirments of the replication
331 * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
332 * <keyspaceName></keyspaceName> WITH replication =
333 * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy:
334 * "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
335 * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
338 * name of the keyspace we want to create
339 * @param keyspaceInfo
340 * configuration info regurding the replication of the keyspace
341 * @return a querey string for the creation of the keyspace
343 private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
345 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
346 query = createNetworkTopologyStrategy(keyspaceInfo, keyspace);
347 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
348 query = createSimpleStrategyQuery(keyspaceInfo, keyspace);
350 log.error("the suplied replication Strategy is in valide expacted {}/{} etc recived:{}",
351 ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName(),
352 ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName(), keyspaceInfo.getReplicationStrategy());
357 private static String createNetworkTopologyStrategy(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
359 List<String> dcList = keyspaceInfo.getReplicationInfo();
360 if (dcList.size() % 2 != 0) {
361 log.error("the supplied replication info is in valid expected dc1,2,dc2,2 etc received:{}", dcList);
364 StringBuilder sb = new StringBuilder();
365 for (int i = 0; i < dcList.size(); i = i + 2) {
366 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
367 if (i + 2 < dcList.size()) {
371 query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
376 private static String createSimpleStrategyQuery(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
378 List<String> dcList = keyspaceInfo.getReplicationInfo();
379 if (dcList.size() != 1) {
380 log.error("the supplied replication info is in valid expected <number> etc received:{}", dcList);
383 query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, "'replication_factor'" + " : " + dcList.get(0));
390 public enum ReplicationStrategy {
391 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
393 private String strategyName;
395 ReplicationStrategy(String strategyName) {
396 this.strategyName = strategyName;
399 public String getStrategyName() {