2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 * Modifications copyright (c) 2018 Nokia
20 * ================================================================================
23 package org.openecomp.sdc.be.dao.cassandra.schema;
25 import com.datastax.driver.core.*;
26 import com.datastax.driver.core.schemabuilder.Alter;
27 import com.datastax.driver.core.schemabuilder.Create;
28 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
29 import com.datastax.driver.core.schemabuilder.SchemaStatement;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.Configuration;
32 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
38 import java.util.stream.Collectors;
39 import java.util.function.Supplier;
41 public class SdcSchemaBuilder {
43 private SdcSchemaUtils sdcSchemaUtils;
44 private Supplier<Configuration.CassandrConfig> cassandraConfigSupplier;
46 public SdcSchemaBuilder(SdcSchemaUtils sdcSchemaUtils, Supplier<Configuration.CassandrConfig> cassandraConfigSupplier) {
47 this.sdcSchemaUtils = sdcSchemaUtils;
48 this.cassandraConfigSupplier = cassandraConfigSupplier;
51 * creat key space statment for SimpleStrategy
53 private static final String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
55 * creat key space statment for NetworkTopologyStrategy
57 private static final String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
59 private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
61 //TODO remove after 1707_OS migration
62 private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata, Map<String, List<ITableDescription>> schemeData){
63 if(cassndraMetadata.containsKey("attaudit")){
64 List<ITableDescription> list = new ArrayList<>();
65 list.add(new OldExternalApiEventTableDesc());
66 schemeData.put("attaudit", list);
71 * the method creates all keyspaces, tables and indexes in case they do not
72 * already exist. the method can be run multiple times. the method uses the
73 * internal enums and external configuration for its operation *
74 * @return true if the create operation was successful
76 public boolean createSchema() {
78 try(Cluster cluster = sdcSchemaUtils.createCluster();
79 Session session = cluster.connect()) {
80 log.info("creating Schema for Cassandra.");
81 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
82 if (keyspacesMetadateFromCassandra == null) {
83 log.debug("filed to retrieve a list of keyspaces from cassandra");
86 log.debug("retrieved Cassandra metadata.");
87 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
88 Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
89 Map<String, List<ITableDescription>> schemeData = getSchemeData();
90 //TODO remove after 1707_OS migration
91 handle1707OSMigration(cassndraMetadata, schemeData);
92 log.info("creating Keyspaces.");
93 for (Map.Entry<String, List<ITableDescription>> keyspace : schemeData.entrySet()) {
94 if (!createKeyspace(keyspace.getKey(), cassndraMetadata, session)) {
97 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace.getKey());
98 createTables(keyspace.getValue(), keyspaceMetadate, session,metadataTablesStructure.get(keyspace.getKey()));
101 } catch (Exception e) {
102 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
108 public boolean deleteSchema() {
110 try(Cluster cluster = sdcSchemaUtils.createCluster();
111 Session session = cluster.connect()) {
112 log.info("delete Data from Cassandra.");
113 List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
114 if (keyspacesMetadateFromCassandra == null) {
115 log.debug("filed to retrieve a list of keyspaces from cassandra");
118 log.debug("retrieved Cassandra metadata.");
119 Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
120 log.info("Cassandra Metadata: {}" ,cassndraMetadata);
121 cassndraMetadata.forEach((k, v) -> {
122 if (AuditingTypesConstants.janusGraph_KEYSPACE.equals(k)) {
123 // session.execute("")
124 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
126 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
131 System.out.println(cassndraMetadata);
133 } catch (Exception e) {
134 log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
140 * the method prcess the metadata retrieved from the cassandra for the
141 * creation of a map conting the names of keyspaces tabls and indexes
142 * already defined in the cassandra keyspacename -> tablename -> list of
145 * @param keyspacesMetadata
147 * @return a map of maps of lists holding parsed info
149 private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
150 return keyspacesMetadata.stream()
151 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
153 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getIndexes()
155 .map(IndexMetadata::getName)
156 .collect(Collectors.toList())))));
159 private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(
160 List<KeyspaceMetadata> keyspacesMetadata) {
161 return keyspacesMetadata.stream()
162 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
164 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getColumns()
166 .map(columnMetadata -> columnMetadata.getName().toLowerCase())
167 .collect(Collectors.toList())))));
171 * the method builds an index name according to a defined logic
175 * @param table: table name
176 * @param column: column name
177 * @return string name of the index
179 private static String createIndexName(String table, String column) {
180 return table + "_" + column + "_idx";
184 * the method creats all the tables and indexes thet do not already exist
186 * @param iTableDescriptions: a list of table description we want to create
187 * @param keyspaceMetadate: the current tables that exist in the cassandra under this keyspace
188 * @param session: the session object used for the execution of the query.
189 * @param existingTablesMetadata
190 * the current tables columns that exist in the cassandra under this
193 private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadate, Session session,
194 Map<String, List<String>> existingTablesMetadata) {
195 for (ITableDescription tableDescription : iTableDescriptions) {
196 String tableName = tableDescription.getTableName().toLowerCase();
197 Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
198 log.info("creating tables:{}.", tableName);
199 if (keyspaceMetadate == null || !keyspaceMetadate.keySet().contains(tableName)) {
200 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(),tableDescription.getTableName());
201 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
202 create.addPartitionKey(key.getLeft(), key.getRight());
204 if (tableDescription.clusteringKeys() != null) {
205 for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
206 create.addClusteringColumn(key.getLeft(), key.getRight());
210 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> entry : columnDescription.entrySet()) {
211 create.addColumn(entry.getKey(), entry.getValue().getLeft());
214 log.trace("exacuting :{}", create);
215 session.execute(create);
216 log.info("table:{} created succsesfully.", tableName);
218 log.info("table:{} already exists skiping.", tableName);
219 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
221 log.info("keyspacemetdata{}",keyspaceMetadate);
222 List<String> indexNames = (keyspaceMetadate != null && keyspaceMetadate.get(tableName) != null ? keyspaceMetadate.get(tableName) : new ArrayList<>());
223 log.info("table:{} creating indexes.", tableName);
224 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> description : columnDescription.entrySet()) {
225 String indexName = createIndexName(tableName, description.getKey()).toLowerCase();
226 if (description.getValue().getRight()) {
227 if (!indexNames.contains(indexName)) {
228 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName)
229 .onTable(tableDescription.getKeyspace(), tableName).andColumn(description.getKey());
230 log.info("executing :{}", creatIndex);
231 session.execute(creatIndex);
232 log.info("index:{} created succsesfully.", indexName);
234 log.info("index:{} already exists skiping.", indexName);
244 * check if there are new columns that were added to definition but don't exist in DB
246 * @param existingTablesMetadata
247 * @param tableDescription
249 * @param columnDescription
251 private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata,
252 ITableDescription tableDescription, String tableName,
253 Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
254 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
255 //add column to casandra if was added to table definition
256 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
257 String columnName = column.getKey();
258 if (!definedTableColumns.contains(columnName.toLowerCase())){
259 log.info("Adding new column {} to the table {}", columnName,tableName);
260 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(),tableDescription.getTableName());
261 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
262 log.trace("exacuting :{}", addColumn);
263 session.execute(addColumn);
269 * the method create the keyspace in case it does not already exists the
270 * method uses configurtion to select the needed replication strategy
272 * @param keyspace: name of the keyspace we want to create
273 * @param cassndraMetadata: cassndra metadata
274 * @param session: the session object used for the execution of the query.
275 * @return true in case the operation was successful
277 private boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
278 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = cassandraConfigSupplier.get().getKeySpaces();
279 log.info("creating keyspace:{}.", keyspace);
280 if (!cassndraMetadata.keySet().contains(keyspace)) {
281 return createKeyspaceIfNotExists(keyspace, session, keyspaceConfigList);
283 log.info("keyspace:{} already exists skipping.", keyspace);
287 private static boolean createKeyspaceIfNotExists(String keyspace, Session session, List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList) {
288 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream().filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
289 if (keyspaceConfig.isPresent()) {
290 return createKeyspaceWhenConfigExists(keyspace, session, keyspaceConfig.get());
292 log.info("keyspace:{} not present in configuration, no info on replications is available. operation failed.", keyspace);
296 private static boolean createKeyspaceWhenConfigExists(String keyspace, Session session, Configuration.CassandrConfig.KeyspaceConfig keyspaceConfig) {
297 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceConfig);
298 if (createKeyspaceQuery != null) {
299 log.trace("exacuting: {}", createKeyspaceQuery);
300 session.execute(createKeyspaceQuery);
301 log.info("keyspace:{} created.", keyspace);
308 * the method retries the schem info from the enums describing the tables
310 * @return a map of keyspaces to there table info
312 private static Map<String, List<ITableDescription>> getSchemeData() {
313 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
314 Table[] tables = Table.values();
315 for (Table table : tables) {
316 String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
317 List<ITableDescription> list = tablesByKeyspace.get(keyspace);
319 list = new ArrayList<>();
321 list.add(table.getTableDescription());
322 tablesByKeyspace.put(keyspace, list);
324 return tablesByKeyspace;
328 * the methoed creates the query string for the given keyspace the methoed
329 * valides the given data according the the requirments of the replication
330 * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
331 * <keyspaceName></keyspaceName> WITH replication =
332 * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy:
333 * "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
334 * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
337 * name of the keyspace we want to create
338 * @param keyspaceInfo
339 * configuration info regurding the replication of the keyspace
340 * @return a querey string for the creation of the keyspace
342 private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
344 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
345 query = createNetworkTopologyStrategy(keyspaceInfo, keyspace);
346 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
347 query = createSimpleStrategyQuery(keyspaceInfo, keyspace);
349 log.error("the suplied replication Strategy is in valide expacted {}/{} etc recived:{}",
350 ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName(),
351 ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName(), keyspaceInfo.getReplicationStrategy());
356 private static String createNetworkTopologyStrategy(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
358 List<String> dcList = keyspaceInfo.getReplicationInfo();
359 if (dcList.size() % 2 != 0) {
360 log.error("the supplied replication info is in valid expected dc1,2,dc2,2 etc received:{}", dcList);
363 StringBuilder sb = new StringBuilder();
364 for (int i = 0; i < dcList.size(); i = i + 2) {
365 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
366 if (i + 2 < dcList.size()) {
370 query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
375 private static String createSimpleStrategyQuery(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
377 List<String> dcList = keyspaceInfo.getReplicationInfo();
378 if (dcList.size() != 1) {
379 log.error("the supplied replication info is in valid expected <number> etc received:{}", dcList);
382 query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, "'replication_factor'" + " : " + dcList.get(0));
389 public enum ReplicationStrategy {
390 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
392 public String strategyName;
394 private ReplicationStrategy(String strategyName) {
395 this.strategyName = strategyName;
398 public String getStrategyName() {