2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.cassandra.schema;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.Optional;
28 import java.util.stream.Collectors;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.Configuration;
32 import org.openecomp.sdc.be.config.ConfigurationManager;
33 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
34 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
35 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
38 import com.datastax.driver.core.AbstractTableMetadata;
39 import com.datastax.driver.core.Cluster;
40 import com.datastax.driver.core.DataType;
41 import com.datastax.driver.core.IndexMetadata;
42 import com.datastax.driver.core.KeyspaceMetadata;
43 import com.datastax.driver.core.Session;
44 import com.datastax.driver.core.schemabuilder.Alter;
45 import com.datastax.driver.core.schemabuilder.Create;
46 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
47 import com.datastax.driver.core.schemabuilder.SchemaStatement;
48 import com.google.common.annotations.VisibleForTesting;
49 public class SdcSchemaBuilder {
52 * creat key space statment for SimpleStrategy
54 final static String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
56 * creat key space statment for NetworkTopologyStrategy
58 final static String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
60 private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
62 //TODO remove after 1707_OS migration
63 private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata, Map<String, List<ITableDescription>> schemeData){
64 if(cassndraMetadata.containsKey("attaudit")){
65 List<ITableDescription> list = new ArrayList<>();
66 list.add(new OldExternalApiEventTableDesc());
67 schemeData.put("attaudit", list);
72 * the method creates all keyspaces, tables and indexes in case they do not
73 * already exist. the method can be run multiple times. the method uses the
74 * internal enums and external configuration for its operation *
75 * @return true if the create operation was successful
77 public static boolean createSchema() {
78 Cluster cluster = null;
79 Session session = null;
81 log.info("creating Schema for Cassandra.");
82 cluster = SdcSchemaUtils.createCluster();
83 if (cluster == null) {
86 session = cluster.connect();
87 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
88 if (keyspacesMetadateFromCassandra == null) {
89 log.debug("filed to retrive a list of keyspaces from cassndra");
92 log.debug("retrived Cassndra metadata.");
93 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
94 Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
95 Map<String, List<ITableDescription>> schemeData = getSchemeData();
96 //TODO remove after 1707_OS migration
97 handle1707OSMigration(cassndraMetadata, schemeData);
98 log.info("creating Keyspaces.");
99 for (String keyspace : schemeData.keySet()) {
100 if (!createKeyspace(keyspace, cassndraMetadata, session)) {
103 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace);
104 createTables(schemeData.get(keyspace), keyspaceMetadate, session,metadataTablesStructure.get(keyspace));
107 } catch (Exception e) {
108 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
110 if (session != null) {
113 if (cluster != null) {
122 public static boolean deleteSchema() {
123 Cluster cluster = null;
124 Session session = null;
126 log.info("delete Data from Cassandra.");
127 cluster = SdcSchemaUtils.createCluster();
128 if (cluster == null) {
131 session = cluster.connect();
132 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
133 if (keyspacesMetadateFromCassandra == null) {
134 log.debug("filed to retrive a list of keyspaces from cassndra");
137 log.debug("retrived Cassndra metadata.");
138 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
139 cassndraMetadata.forEach((k, v) -> {
140 if (AuditingTypesConstants.TITAN_KEYSPACE.equals(k)) {
141 // session.execute("")
142 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
144 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
149 System.out.println(cassndraMetadata);
151 } catch (Exception e) {
152 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
154 if (session != null) {
157 if (cluster != null) {
169 * the method prcess the metadata retrieved from the cassandra for the
170 * creation of a map conting the names of keyspaces tabls and indexes
171 * already defined in the cassandra keyspacename -> tablename -> list of
174 * @param keyspacesMetadata
176 * @return a map of maps of lists holding parsed info
178 private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
179 return keyspacesMetadata.stream()
180 .collect(Collectors.toMap(KeyspaceMetadata::getName,
181 keyspaceMetadata -> keyspaceMetadata.getTables().stream()
182 .collect(Collectors.toMap(AbstractTableMetadata::getName,
183 tableMetadata -> tableMetadata.getIndexes().stream()
184 .map(IndexMetadata::getName)
185 .collect(Collectors.toList())))));
188 private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(
189 List<KeyspaceMetadata> keyspacesMetadata) {
190 return keyspacesMetadata.stream().collect(
191 Collectors.toMap(KeyspaceMetadata::getName,
192 keyspaceMetadata -> keyspaceMetadata.getTables().stream().collect(
193 Collectors.toMap(AbstractTableMetadata::getName,
194 tableMetadata -> tableMetadata.getColumns().stream().map(
195 columnMetadata -> columnMetadata.getName().toLowerCase()).collect(
196 Collectors.toList())))));
200 * the method builds an index name according to a defined logic
204 * @param table: table name
205 * @param column: column name
206 * @return string name of the index
208 private static String createIndexName(String table, String column) {
209 return new StringBuilder().append(table).append("_").append(column).append("_idx").toString();
213 * the method creats all the tables and indexes thet do not already exist
215 * @param iTableDescriptions: a list of table description we want to create
216 * @param keyspaceMetadate: the current tables that exist in the cassandra under this keyspace
217 * @param session: the session object used for the execution of the query.
218 * @param existingTablesMetadata
219 * the current tables columns that exist in the cassandra under this
222 private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadate, Session session,
223 Map<String, List<String>> existingTablesMetadata) {
224 for (ITableDescription tableDescription : iTableDescriptions) {
225 String tableName = tableDescription.getTableName().toLowerCase();
226 Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
227 log.info("creating tables:{}.", tableName);
228 if (keyspaceMetadate == null || !keyspaceMetadate.keySet().contains(tableName)) {
229 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(),tableDescription.getTableName());
230 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
231 create.addPartitionKey(key.getLeft(), key.getRight());
233 if (tableDescription.clusteringKeys() != null) {
234 for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
235 create.addClusteringColumn(key.getLeft(), key.getRight());
239 for (String columnName : columnDescription.keySet()) {
240 create.addColumn(columnName, columnDescription.get(columnName).getLeft());
242 log.trace("exacuting :{}", create.toString());
243 session.execute(create);
244 log.info("table:{} created succsesfully.", tableName);
246 log.info("table:{} already exists skiping.", tableName);
247 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
249 log.info("keyspacemetdata{}",keyspaceMetadate);
250 List<String> indexNames = (keyspaceMetadate != null && keyspaceMetadate.get(tableName) != null ? keyspaceMetadate.get(tableName) : new ArrayList<>());
251 log.info("table:{} creating indexes.", tableName);
252 for (String columnName : columnDescription.keySet()) {
253 String indexName = createIndexName(tableName, columnName).toLowerCase();
254 if (columnDescription.get(columnName).getRight()) {
255 if (!indexNames.contains(indexName)) {
256 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName)
257 .onTable(tableDescription.getKeyspace(), tableName).andColumn(columnName);
258 log.info("executing :{}", creatIndex.toString());
259 session.execute(creatIndex);
260 log.info("index:{} created succsesfully.", indexName);
262 log.info("index:{} already exists skiping.", indexName);
271 * check if there are new columns that were added to definition but don't exist in DB
273 * @param existingTablesMetadata
274 * @param tableDescription
276 * @param columnDescription
278 private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata,
279 ITableDescription tableDescription, String tableName,
280 Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
281 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
282 //add column to casandra if was added to table definition
283 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
284 String columnName = column.getKey();
285 if (!definedTableColumns.contains(columnName.toLowerCase())){
286 log.info("Adding new column {} to the table {}", columnName,tableName);
287 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(),tableDescription.getTableName());
288 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
289 log.trace("exacuting :{}", addColumn.toString());
290 session.execute(addColumn);
296 * the method create the keyspace in case it does not already exists the
297 * method uses configurtion to select the needed replication strategy
299 * @param keyspace: name of the keyspace we want to create
300 * @param cassndraMetadata: cassndra metadata
301 * @param session: the session object used for the execution of the query.
302 * @return true in case the operation was successful
304 private static boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
305 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getKeySpaces();
306 log.info("creating keyspace:{}.", keyspace);
307 if (!cassndraMetadata.keySet().contains(keyspace)) {
308 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream().filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
309 if (keyspaceConfig.isPresent()) {
310 Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo = keyspaceConfig.get();
311 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceInfo);
312 if (createKeyspaceQuery != null) {
313 log.trace("exacuting: {}", createKeyspaceQuery);
314 session.execute(createKeyspaceQuery);
315 log.debug("keyspace:{} created.", keyspace);
322 "keyspace:{} not present in configuration, no info on replications is available. operation failed.",
327 log.debug("keyspace:{} already exists skipping.", keyspace);
333 * the method retries the schem info from the enums describing the tables
335 * @return a map of keyspaces to there table info
337 private static Map<String, List<ITableDescription>> getSchemeData() {
338 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
339 Table[] tables = Table.values();
340 for (Table table : tables) {
341 String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
342 List<ITableDescription> list = tablesByKeyspace.get(keyspace);
344 list = new ArrayList<>();
346 list.add(table.getTableDescription());
347 tablesByKeyspace.put(keyspace, list);
349 return tablesByKeyspace;
353 * the methoed creates the query string for the given keyspace the methoed
354 * valides the given data according the the requirments of the replication
355 * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
356 * <keyspaceName></keyspaceName> WITH replication =
357 * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy:
358 * "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
359 * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
362 * name of the keyspace we want to create
363 * @param keyspaceInfo
364 * configuration info regurding the replication of the keyspace
365 * @return a querey string for the creation of the keyspace
367 private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
369 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
370 List<String> dcList = keyspaceInfo.getReplicationInfo();
371 if (dcList.size() % 2 != 0) {
372 log.error("the supplied replication info is in valid expected dc1,2,dc2,2 etc received:{}", dcList);
375 StringBuilder sb = new StringBuilder();
376 for (int i = 0; i < dcList.size(); i = i + 2) {
377 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
378 if (i + 2 < dcList.size()) {
383 query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
384 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
385 List<String> dcList = keyspaceInfo.getReplicationInfo();
386 if (dcList.size() != 1) {
387 log.error("the supplied replication info is in valid expected <number> etc received:{}", dcList);
390 StringBuilder sb = new StringBuilder();
391 sb.append("'replication_factor'").append(" : ").append(dcList.get(0));
392 query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, sb.toString());
395 log.error("the suplied replication Strategy is in valide expacted {}/{} etc recived:{}",
396 ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getName(),
397 ReplicationStrategy.SIMPLE_STRATEGY.getName(), keyspaceInfo.getReplicationStrategy());
402 public enum ReplicationStrategy {
403 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
407 ReplicationStrategy(String name) {
411 public String getName() {