Catalog alignment
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / schema / SdcSchemaBuilder.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  * Modifications copyright (c) 2018 Nokia
20  * ================================================================================
21  */
22
23 package org.openecomp.sdc.be.dao.cassandra.schema;
24
25 import com.datastax.driver.core.*;
26 import com.datastax.driver.core.schemabuilder.Alter;
27 import com.datastax.driver.core.schemabuilder.Create;
28 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
29 import com.datastax.driver.core.schemabuilder.SchemaStatement;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.Configuration;
32 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36
37 import java.util.*;
38 import java.util.stream.Collectors;
39 import java.util.function.Supplier;
40
41 public class SdcSchemaBuilder {
42
43         private SdcSchemaUtils sdcSchemaUtils;
44         private Supplier<Configuration.CassandrConfig> cassandraConfigSupplier;
45
46         public SdcSchemaBuilder(SdcSchemaUtils sdcSchemaUtils, Supplier<Configuration.CassandrConfig> cassandraConfigSupplier) {
47                 this.sdcSchemaUtils = sdcSchemaUtils;
48                 this.cassandraConfigSupplier = cassandraConfigSupplier;
49         }
50         /**
51          * creat key space statment for SimpleStrategy
52          */
53         private static final String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
54         /**
55          * creat key space statment for NetworkTopologyStrategy
56          */
57         private static final String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
58
59         private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
60
61         //TODO remove after 1707_OS migration
62         private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata, Map<String, List<ITableDescription>> schemeData){
63                 if(cassndraMetadata.containsKey("attaudit")){
64                         List<ITableDescription> list = new ArrayList<>();
65                         list.add(new OldExternalApiEventTableDesc());
66                         schemeData.put("attaudit", list);
67                 }
68                 
69         }
70         /**
71          * the method creates all keyspaces, tables and indexes in case they do not
72          * already exist. the method can be run multiple times. the method uses the
73          * internal enums and external configuration for its operation   * 
74          * @return true if the create operation was successful
75          */
76         public boolean createSchema() {
77                 boolean res = false;
78                 try(Cluster cluster = sdcSchemaUtils.createCluster();
79                                 Session session = cluster.connect()) {
80                         log.info("creating Schema for Cassandra.");
81                         List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
82                         if (keyspacesMetadateFromCassandra == null) {
83                                 log.debug("filed to retrieve a list of keyspaces from cassandra");
84                                 return false;
85                         }
86                         log.debug("retrieved Cassandra metadata.");
87                         Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
88                         Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
89                         Map<String, List<ITableDescription>> schemeData = getSchemeData();
90                         //TODO remove after 1707_OS migration
91                         handle1707OSMigration(cassndraMetadata, schemeData);
92                         log.info("creating Keyspaces.");
93                         for (Map.Entry<String, List<ITableDescription>> keyspace : schemeData.entrySet()) {
94                                 if (!createKeyspace(keyspace.getKey(), cassndraMetadata, session)) {
95                                         return false;
96                                 }
97                                 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace.getKey());
98                                 createTables(keyspace.getValue(), keyspaceMetadate, session,metadataTablesStructure.get(keyspace.getKey()));
99                         }
100                         res = true;
101                 } catch (Exception e) {
102             log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
103             res = false;
104         }
105                 return res;
106         }
107
108         public boolean deleteSchema() {
109                 boolean res = false;
110                 try(Cluster cluster = sdcSchemaUtils.createCluster();
111                                 Session session = cluster.connect()) {
112                         log.info("delete Data from Cassandra.");
113                         List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
114                         if (keyspacesMetadateFromCassandra == null) {
115                                 log.debug("filed to retrieve a list of keyspaces from cassandra");
116                                 return false;
117                         }
118                         log.debug("retrieved Cassandra metadata.");
119                         Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
120       log.info("Cassandra Metadata: {}" ,cassndraMetadata);
121       cassndraMetadata.forEach((k, v) -> {
122                                 if (AuditingTypesConstants.janusGraph_KEYSPACE.equals(k)) {
123                                         // session.execute("")
124                                 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
125
126                                 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
127
128                                 }
129                         });
130
131                         System.out.println(cassndraMetadata);
132                         res = true;
133                 } catch (Exception e) {
134             log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
135                 }
136                 return res;
137         }
138
139         /**
140          * the method prcess the metadata retrieved from the cassandra for the
141          * creation of a map conting the names of keyspaces tabls and indexes
142          * already defined in the cassandra keyspacename -> tablename -> list of
143          * indexes info
144          * 
145          * @param keyspacesMetadata
146          *            cassndra mmetadata
147          * @return a map of maps of lists holding parsed info
148          */
149         private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
150                 return keyspacesMetadata.stream()
151                                 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
152                                                 .stream()
153                                                 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getIndexes()
154                                                                 .stream()
155                                                                 .map(IndexMetadata::getName)
156                                                                 .collect(Collectors.toList())))));
157         }
158         
159         private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(
160                         List<KeyspaceMetadata> keyspacesMetadata) {
161                 return keyspacesMetadata.stream()
162                                 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
163                                                 .stream()
164                                                 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getColumns()
165                                                                 .stream()
166                                                                 .map(columnMetadata -> columnMetadata.getName().toLowerCase())
167                                                                 .collect(Collectors.toList())))));
168         }
169
170         /**
171          * the method builds an index name according to a defined logic
172          * <table>
173          * _<column>_idx
174          * 
175          * @param table: table name
176          * @param column: column name
177          * @return string name of the index
178          */
179         private static String createIndexName(String table, String column) {
180                 return table + "_" + column + "_idx";
181         }
182
183         /**
184          * the method creats all the tables and indexes thet do not already exist
185          *
186          * @param iTableDescriptions: a list of table description we want to create
187          * @param keyspaceMetadate: the current tables that exist in the cassandra under this keyspace
188          * @param session: the session object used for the execution of the query.
189          * @param existingTablesMetadata 
190          *                      the current tables columns that exist in the cassandra under this
191          *            keyspace
192          */
193         private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadate, Session session,
194                         Map<String, List<String>> existingTablesMetadata) {
195                 for (ITableDescription tableDescription : iTableDescriptions) {
196                         String tableName = tableDescription.getTableName().toLowerCase();
197                         Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
198                         log.info("creating tables:{}.", tableName);
199                         if (keyspaceMetadate == null || !keyspaceMetadate.keySet().contains(tableName)) {
200                                 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(),tableDescription.getTableName());
201                                 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
202                                         create.addPartitionKey(key.getLeft(), key.getRight());
203                                 }
204                                 if (tableDescription.clusteringKeys() != null) {
205                                         for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
206                                                 create.addClusteringColumn(key.getLeft(), key.getRight());
207                                         }
208                                 }
209
210                                 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> entry : columnDescription.entrySet()) {
211                                         create.addColumn(entry.getKey(), entry.getValue().getLeft());
212                                 }
213
214                                 log.trace("exacuting :{}", create);
215                                 session.execute(create);
216                                 log.info("table:{} created succsesfully.", tableName);
217                         } else {
218                                 log.info("table:{} already exists skiping.", tableName);
219                                 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
220                         }
221                         log.info("keyspacemetdata{}",keyspaceMetadate);
222                         List<String> indexNames = (keyspaceMetadate != null && keyspaceMetadate.get(tableName) != null ? keyspaceMetadate.get(tableName) : new ArrayList<>());
223                         log.info("table:{} creating indexes.", tableName);
224                         for (Map.Entry<String, ImmutablePair<DataType, Boolean>> description : columnDescription.entrySet()) {
225                                 String indexName = createIndexName(tableName, description.getKey()).toLowerCase();
226                                 if (description.getValue().getRight()) {
227                                         if (!indexNames.contains(indexName)) {
228                                                 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName)
229                                                                 .onTable(tableDescription.getKeyspace(), tableName).andColumn(description.getKey());
230                                                 log.info("executing :{}", creatIndex);
231                                                 session.execute(creatIndex);
232                                                 log.info("index:{} created succsesfully.", indexName);
233                                         } else {
234                                                 log.info("index:{} already exists skiping.", indexName);
235                                         }
236                                 }
237                         }
238
239
240                 }
241         }
242
243         /**
244          * check if there are new columns that were added to definition but don't exist in DB
245          * @param session
246          * @param existingTablesMetadata
247          * @param tableDescription
248          * @param tableName
249          * @param columnDescription
250          */
251         private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata,
252                         ITableDescription tableDescription, String tableName,
253                         Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
254                 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
255                 //add column to casandra if was added to table definition
256                 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
257                         String columnName = column.getKey();
258                         if (!definedTableColumns.contains(columnName.toLowerCase())){
259                                 log.info("Adding new column {} to the table {}", columnName,tableName);
260                                 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(),tableDescription.getTableName());
261                                 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
262                                 log.trace("exacuting :{}", addColumn);
263                                 session.execute(addColumn);
264                         }
265                 }
266         }
267
268         /**
269          * the method create the keyspace in case it does not already exists the
270          * method uses configurtion to select the needed replication strategy
271          * 
272          * @param keyspace: name of the keyspace we want to create
273          * @param cassndraMetadata: cassndra metadata
274          * @param session: the session object used for the execution of the query.
275          * @return true in case the operation was successful
276          */
277         private boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
278                 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = cassandraConfigSupplier.get().getKeySpaces();
279                 log.info("creating keyspace:{}.", keyspace);
280                 if (!cassndraMetadata.keySet().contains(keyspace)) {
281                         return createKeyspaceIfNotExists(keyspace, session, keyspaceConfigList);
282                 }
283                 log.info("keyspace:{} already exists skipping.", keyspace);
284                 return true;
285         }
286
287         private static boolean createKeyspaceIfNotExists(String keyspace, Session session, List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList) {
288                 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream().filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
289                 if (keyspaceConfig.isPresent()) {
290                         return createKeyspaceWhenConfigExists(keyspace, session, keyspaceConfig.get());
291                 }
292                 log.info("keyspace:{} not present in configuration, no info on replications is available. operation failed.", keyspace);
293                 return false;
294         }
295
296         private static boolean createKeyspaceWhenConfigExists(String keyspace, Session session, Configuration.CassandrConfig.KeyspaceConfig keyspaceConfig) {
297                 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceConfig);
298                 if (createKeyspaceQuery != null) {
299                         log.trace("exacuting: {}", createKeyspaceQuery);
300                         session.execute(createKeyspaceQuery);
301                         log.info("keyspace:{} created.", keyspace);
302                         return true;
303                 }
304                 return false;
305         }
306
307         /**
308          * the method retries the schem info from the enums describing the tables
309          * 
310          * @return a map of keyspaces to there table info
311          */
312         private static Map<String, List<ITableDescription>> getSchemeData() {
313                 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
314                 Table[] tables = Table.values();
315                 for (Table table : tables) {
316                         String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
317                         List<ITableDescription> list = tablesByKeyspace.get(keyspace);
318                         if (list == null) {
319                                 list = new ArrayList<>();
320                         }
321                         list.add(table.getTableDescription());
322                         tablesByKeyspace.put(keyspace, list);
323                 }
324                 return tablesByKeyspace;
325         }
326
327         /**
328          * the methoed creates the query string for the given keyspace the methoed
329          * valides the given data according the the requirments of the replication
330          * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
331          * <keyspaceName></keyspaceName> WITH replication =
332          * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy:
333          * "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
334          * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
335          * 
336          * @param keyspace
337          *            name of the keyspace we want to create
338          * @param keyspaceInfo
339          *            configuration info regurding the replication of the keyspace
340          * @return a querey string for the creation of the keyspace
341          */
342         private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
343                 String query = null;
344                 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
345                         query = createNetworkTopologyStrategy(keyspaceInfo, keyspace);
346                 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
347                         query = createSimpleStrategyQuery(keyspaceInfo, keyspace);
348                 } else {
349                         log.error("the suplied replication Strategy  is in valide expacted {}/{} etc recived:{}",
350                                         ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName(),
351                                         ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName(), keyspaceInfo.getReplicationStrategy());
352                 }
353                 return query;
354         }
355
356         private static String createNetworkTopologyStrategy(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
357                 String query = null;
358                 List<String> dcList = keyspaceInfo.getReplicationInfo();
359                 if (dcList.size() % 2 != 0) {
360                         log.error("the supplied replication info is in valid expected dc1,2,dc2,2 etc received:{}", dcList);
361
362                 } else {
363                         StringBuilder sb = new StringBuilder();
364                         for (int i = 0; i < dcList.size(); i = i + 2) {
365                                 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
366                                 if (i + 2 < dcList.size()) {
367                                         sb.append(",");
368                                 }
369                         }
370                         query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
371                 }
372
373                 return query;
374         }
375         private static String createSimpleStrategyQuery(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
376                 String query = null;
377                 List<String> dcList = keyspaceInfo.getReplicationInfo();
378                 if (dcList.size() != 1) {
379                         log.error("the supplied replication info is in valid expected <number> etc received:{}", dcList);
380
381                 } else {
382                         query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, "'replication_factor'" + " : " + dcList.get(0));
383                 }
384                 return query;
385         }
386
387
388
389         public enum ReplicationStrategy {
390                 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
391
392                 public String strategyName;
393
394                 private ReplicationStrategy(String strategyName) {
395                         this.strategyName = strategyName;
396                 }
397
398                 public String getStrategyName() {
399                         return strategyName;
400                 }
401         }
402
403 }