Upgrade SDC from Titan to Janus Graph
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / schema / SdcSchemaBuilder.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  * Modifications copyright (c) 2018 Nokia
20  * ================================================================================
21  */
22
23 package org.openecomp.sdc.be.dao.cassandra.schema;
24
25 import com.datastax.driver.core.*;
26 import com.datastax.driver.core.schemabuilder.Alter;
27 import com.datastax.driver.core.schemabuilder.Create;
28 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
29 import com.datastax.driver.core.schemabuilder.SchemaStatement;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.Configuration;
32 import org.openecomp.sdc.be.dao.cassandra.schema.tables.OldExternalApiEventTableDesc;
33 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
34 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36
37 import java.util.*;
38 import java.util.stream.Collectors;
39 import java.util.function.Supplier;
40
41 public class SdcSchemaBuilder {
42
43     /**
44      * creat key space statment for SimpleStrategy
45      */
46     private static final String CREATE_KEYSPACE_SIMPLE_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'SimpleStrategy', %s};";
47     /**
48      * creat key space statment for NetworkTopologyStrategy
49      */
50     private static final String CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY = "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class':'NetworkTopologyStrategy', %s};";
51
52     private static Logger log = Logger.getLogger(SdcSchemaBuilder.class.getName());
53     
54         private SdcSchemaUtils sdcSchemaUtils;
55         private Supplier<Configuration.CassandrConfig> cassandraConfigSupplier;
56
57         public SdcSchemaBuilder(SdcSchemaUtils sdcSchemaUtils, Supplier<Configuration.CassandrConfig> cassandraConfigSupplier) {
58                 this.sdcSchemaUtils = sdcSchemaUtils;
59                 this.cassandraConfigSupplier = cassandraConfigSupplier;
60         }
61
62         //TODO remove after 1707_OS migration
63         private static void handle1707OSMigration(Map<String, Map<String, List<String>>> cassndraMetadata, Map<String, List<ITableDescription>> schemeData){
64                 if(cassndraMetadata.containsKey("attaudit")){
65                         List<ITableDescription> list = new ArrayList<>();
66                         list.add(new OldExternalApiEventTableDesc());
67                         schemeData.put("attaudit", list);
68                 }
69
70         }
71         /**
72          * the method creates all keyspaces, tables and indexes in case they do not
73          * already exist. the method can be run multiple times. the method uses the
74          * internal enums and external configuration for its operation   *
75          * @return true if the create operation was successful
76          */
77         public boolean createSchema() {
78                 boolean res = false;
79                 try(Cluster cluster = sdcSchemaUtils.createCluster();
80                                 Session session = cluster.connect()) {
81                         log.info("creating Schema for Cassandra.");
82                         List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
83                         if (keyspacesMetadateFromCassandra == null) {
84                                 log.debug("filed to retrieve a list of keyspaces from cassandra");
85                                 return false;
86                         }
87                         log.debug("retrieved Cassandra metadata.");
88                         Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
89                         Map<String, Map<String, List<String>>> metadataTablesStructure = getMetadataTablesStructure(keyspacesMetadateFromCassandra);
90                         Map<String, List<ITableDescription>> schemeData = getSchemeData();
91                         //TODO remove after 1707_OS migration
92                         handle1707OSMigration(cassndraMetadata, schemeData);
93                         log.info("creating Keyspaces.");
94                         for (Map.Entry<String, List<ITableDescription>> keyspace : schemeData.entrySet()) {
95                                 if (!createKeyspace(keyspace.getKey(), cassndraMetadata, session)) {
96                                         return false;
97                                 }
98                                 Map<String, List<String>> keyspaceMetadate = cassndraMetadata.get(keyspace.getKey());
99                                 createTables(keyspace.getValue(), keyspaceMetadate, session,metadataTablesStructure.get(keyspace.getKey()));
100                         }
101                         res = true;
102                 } catch (Exception e) {
103             log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "creating Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
104             res = false;
105         }
106                 return res;
107         }
108
109         public boolean deleteSchema() {
110                 boolean res = false;
111                 try(Cluster cluster = sdcSchemaUtils.createCluster();
112                                 Session session = cluster.connect()) {
113                         log.info("delete Data from Cassandra.");
114                         List<KeyspaceMetadata> keyspacesMetadateFromCassandra = cluster.getMetadata().getKeyspaces();
115                         if (keyspacesMetadateFromCassandra == null) {
116                                 log.debug("filed to retrieve a list of keyspaces from cassandra");
117                                 return false;
118                         }
119                         log.debug("retrieved Cassandra metadata.");
120                         Map<String, Map<String, List<String>>> cassndraMetadata = parseKeyspaceMetadata(keyspacesMetadateFromCassandra);
121       log.info("Cassandra Metadata: {}" ,cassndraMetadata);
122       cassndraMetadata.forEach((k, v) -> {
123                                 if (AuditingTypesConstants.janusGraph_KEYSPACE.equals(k)) {
124                                         // session.execute("")
125                                 } else if (AuditingTypesConstants.ARTIFACT_KEYSPACE.equals(k)) {
126
127                                 } else if (AuditingTypesConstants.AUDIT_KEYSPACE.equals(k)) {
128
129                                 }
130                         });
131
132                         System.out.println(cassndraMetadata);
133                         res = true;
134                 } catch (Exception e) {
135             log.error(EcompLoggerErrorCode.SCHEMA_ERROR, "deleting Schema for Cassandra", "Cassandra", e.getLocalizedMessage());
136                 }
137                 return res;
138         }
139
140         /**
141          * the method prcess the metadata retrieved from the cassandra for the
142          * creation of a map conting the names of keyspaces tabls and indexes
143          * already defined in the cassandra keyspacename -> tablename -> list of
144          * indexes info
145          *
146          * @param keyspacesMetadata
147          *            cassndra mmetadata
148          * @return a map of maps of lists holding parsed info
149          */
150         private static Map<String, Map<String, List<String>>> parseKeyspaceMetadata(List<KeyspaceMetadata> keyspacesMetadata) {
151                 return keyspacesMetadata.stream()
152                                 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
153                                                 .stream()
154                                                 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getIndexes()
155                                                                 .stream()
156                                                                 .map(IndexMetadata::getName)
157                                                                 .collect(Collectors.toList())))));
158         }
159
160         private static Map<String, Map<String, List<String>>> getMetadataTablesStructure(
161                         List<KeyspaceMetadata> keyspacesMetadata) {
162                 return keyspacesMetadata.stream()
163                                 .collect(Collectors.toMap(KeyspaceMetadata::getName, keyspaceMetadata -> keyspaceMetadata.getTables()
164                                                 .stream()
165                                                 .collect(Collectors.toMap(AbstractTableMetadata::getName, tableMetadata -> tableMetadata.getColumns()
166                                                                 .stream()
167                                                                 .map(columnMetadata -> columnMetadata.getName().toLowerCase())
168                                                                 .collect(Collectors.toList())))));
169         }
170
171         /**
172          * the method builds an index name according to a defined logic
173          * <table>
174          * _<column>_idx
175          *
176          * @param table: table name
177          * @param column: column name
178          * @return string name of the index
179          */
180         private static String createIndexName(String table, String column) {
181                 return table + "_" + column + "_idx";
182         }
183
184         /**
185          * the method creats all the tables and indexes thet do not already exist
186          *
187          * @param iTableDescriptions: a list of table description we want to create
188          * @param keyspaceMetadate: the current tables that exist in the cassandra under this keyspace
189          * @param session: the session object used for the execution of the query.
190          * @param existingTablesMetadata
191          *                      the current tables columns that exist in the cassandra under this
192          *            keyspace
193          */
194         private static void createTables(List<ITableDescription> iTableDescriptions, Map<String, List<String>> keyspaceMetadate, Session session,
195                         Map<String, List<String>> existingTablesMetadata) {
196                 for (ITableDescription tableDescription : iTableDescriptions) {
197                         String tableName = tableDescription.getTableName().toLowerCase();
198                         Map<String, ImmutablePair<DataType, Boolean>> columnDescription = tableDescription.getColumnDescription();
199                         log.info("creating tables:{}.", tableName);
200                         if (keyspaceMetadate == null || !keyspaceMetadate.keySet().contains(tableName)) {
201                                 Create create = SchemaBuilder.createTable(tableDescription.getKeyspace(),tableDescription.getTableName());
202                                 for (ImmutablePair<String, DataType> key : tableDescription.primaryKeys()) {
203                                         create.addPartitionKey(key.getLeft(), key.getRight());
204                                 }
205                                 if (tableDescription.clusteringKeys() != null) {
206                                         for (ImmutablePair<String, DataType> key : tableDescription.clusteringKeys()) {
207                                                 create.addClusteringColumn(key.getLeft(), key.getRight());
208                                         }
209                                 }
210
211                                 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> entry : columnDescription.entrySet()) {
212                                         create.addColumn(entry.getKey(), entry.getValue().getLeft());
213                                 }
214
215                                 log.trace("exacuting :{}", create);
216                                 session.execute(create);
217                                 log.info("table:{} created succsesfully.", tableName);
218                         } else {
219                                 log.info("table:{} already exists skiping.", tableName);
220                                 alterTable(session, existingTablesMetadata, tableDescription, tableName, columnDescription);
221                         }
222                         log.info("keyspacemetdata{}",keyspaceMetadate);
223                         List<String> indexNames = (keyspaceMetadate != null && keyspaceMetadate.get(tableName) != null ? keyspaceMetadate.get(tableName) : new ArrayList<>());
224                         log.info("table:{} creating indexes.", tableName);
225                         for (Map.Entry<String, ImmutablePair<DataType, Boolean>> description : columnDescription.entrySet()) {
226                                 String indexName = createIndexName(tableName, description.getKey()).toLowerCase();
227                                 if (description.getValue().getRight()) {
228                                         if (!indexNames.contains(indexName)) {
229                                                 SchemaStatement creatIndex = SchemaBuilder.createIndex(indexName)
230                                                                 .onTable(tableDescription.getKeyspace(), tableName).andColumn(description.getKey());
231                                                 log.info("executing :{}", creatIndex);
232                                                 session.execute(creatIndex);
233                                                 log.info("index:{} created succsesfully.", indexName);
234                                         } else {
235                                                 log.info("index:{} already exists skiping.", indexName);
236                                         }
237                                 }
238                         }
239
240
241                 }
242         }
243
244         /**
245          * check if there are new columns that were added to definition but don't exist in DB
246          * @param session
247          * @param existingTablesMetadata
248          * @param tableDescription
249          * @param tableName
250          * @param columnDescription
251          */
252         private static void alterTable(Session session, Map<String, List<String>> existingTablesMetadata,
253                                                                    ITableDescription tableDescription, String tableName,
254                                                                    Map<String, ImmutablePair<DataType, Boolean>> columnDescription) {
255                 List<String> definedTableColumns = existingTablesMetadata.get(tableName);
256                 //add column to casandra if was added to table definition
257                 for (Map.Entry<String, ImmutablePair<DataType, Boolean>> column : columnDescription.entrySet()) {
258                         String columnName = column.getKey();
259                         if (!definedTableColumns.contains(columnName.toLowerCase())){
260                                 log.info("Adding new column {} to the table {}", columnName,tableName);
261                                 Alter alter = SchemaBuilder.alterTable(tableDescription.getKeyspace(),tableDescription.getTableName());
262                                 SchemaStatement addColumn = alter.addColumn(columnName).type(column.getValue().getLeft());
263                                 log.trace("exacuting :{}", addColumn);
264                                 session.execute(addColumn);
265                         }
266                 }
267         }
268
269         /**
270          * the method create the keyspace in case it does not already exists the
271          * method uses configurtion to select the needed replication strategy
272          *
273          * @param keyspace: name of the keyspace we want to create
274          * @param cassndraMetadata: cassndra metadata
275          * @param session: the session object used for the execution of the query.
276          * @return true in case the operation was successful
277          */
278         private boolean createKeyspace(String keyspace, Map<String, Map<String, List<String>>> cassndraMetadata, Session session) {
279                 List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList = cassandraConfigSupplier.get().getKeySpaces();
280                 log.info("creating keyspace:{}.", keyspace);
281                 if (!cassndraMetadata.keySet().contains(keyspace)) {
282                         return createKeyspaceIfNotExists(keyspace, session, keyspaceConfigList);
283                 }
284                 log.info("keyspace:{} already exists skipping.", keyspace);
285                 return true;
286         }
287
288         private static boolean createKeyspaceIfNotExists(String keyspace, Session session, List<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfigList) {
289                 Optional<Configuration.CassandrConfig.KeyspaceConfig> keyspaceConfig = keyspaceConfigList.stream().filter(keyspaceInfo -> keyspace.equalsIgnoreCase(keyspaceInfo.getName())).findFirst();
290                 if (keyspaceConfig.isPresent()) {
291                         return createKeyspaceWhenConfigExists(keyspace, session, keyspaceConfig.get());
292                 }
293                 log.info("keyspace:{} not present in configuration, no info on replications is available. operation failed.", keyspace);
294                 return false;
295         }
296
297         private static boolean createKeyspaceWhenConfigExists(String keyspace, Session session, Configuration.CassandrConfig.KeyspaceConfig keyspaceConfig) {
298                 String createKeyspaceQuery = createKeyspaceQuereyString(keyspace, keyspaceConfig);
299                 if (createKeyspaceQuery != null) {
300                         log.trace("exacuting: {}", createKeyspaceQuery);
301                         session.execute(createKeyspaceQuery);
302                         log.info("keyspace:{} created.", keyspace);
303                         return true;
304                 }
305                 return false;
306         }
307
308         /**
309          * the method retries the schem info from the enums describing the tables
310          *
311          * @return a map of keyspaces to there table info
312          */
313         private static Map<String, List<ITableDescription>> getSchemeData() {
314                 Map<String, List<ITableDescription>> tablesByKeyspace = new HashMap<>();
315                 Table[] tables = Table.values();
316                 for (Table table : tables) {
317                         String keyspace = table.getTableDescription().getKeyspace().toLowerCase();
318                         List<ITableDescription> list = tablesByKeyspace.get(keyspace);
319                         if (list == null) {
320                                 list = new ArrayList<>();
321                         }
322                         list.add(table.getTableDescription());
323                         tablesByKeyspace.put(keyspace, list);
324                 }
325                 return tablesByKeyspace;
326         }
327
328         /**
329          * the methoed creates the query string for the given keyspace the methoed
330          * valides the given data according the the requirments of the replication
331          * strategy SimpleStrategy: "CREATE KEYSPACE IF NOT EXISTS
332          * <keyspaceName></keyspaceName> WITH replication =
333          * {'class':'SimpleStrategy', 'replication_factor':2};" SimpleStrategy:
334          * "CREATE KEYSPACE IF NOT EXISTS <keyspaceName></keyspaceName> WITH
335          * replication = {'class':'NetworkTopologyStrategy', 'dc1' : 2 ,dc2 : 2 };"
336          *
337          * @param keyspace
338          *            name of the keyspace we want to create
339          * @param keyspaceInfo
340          *            configuration info regurding the replication of the keyspace
341          * @return a querey string for the creation of the keyspace
342          */
343         private static String createKeyspaceQuereyString(String keyspace, Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo) {
344                 String query = null;
345                 if (ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
346                         query = createNetworkTopologyStrategy(keyspaceInfo, keyspace);
347                 } else if (ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName().equalsIgnoreCase(keyspaceInfo.getReplicationStrategy())) {
348                         query = createSimpleStrategyQuery(keyspaceInfo, keyspace);
349                 } else {
350                         log.error("the suplied replication Strategy  is in valide expacted {}/{} etc recived:{}",
351                                         ReplicationStrategy.NETWORK_TOPOLOGY_STRATEGY.getStrategyName(),
352                                         ReplicationStrategy.SIMPLE_STRATEGY.getStrategyName(), keyspaceInfo.getReplicationStrategy());
353                 }
354                 return query;
355         }
356
357         private static String createNetworkTopologyStrategy(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
358                 String query = null;
359                 List<String> dcList = keyspaceInfo.getReplicationInfo();
360                 if (dcList.size() % 2 != 0) {
361                         log.error("the supplied replication info is in valid expected dc1,2,dc2,2 etc received:{}", dcList);
362
363                 } else {
364                         StringBuilder sb = new StringBuilder();
365                         for (int i = 0; i < dcList.size(); i = i + 2) {
366                                 sb.append("'").append(dcList.get(i)).append("'").append(" : ").append(dcList.get(i + 1));
367                                 if (i + 2 < dcList.size()) {
368                                         sb.append(",");
369                                 }
370                         }
371                         query = String.format(CREATE_KEYSPACE_NETWORK_TOPOLOGY_STRATEGY, keyspace, sb.toString());
372                 }
373
374                 return query;
375         }
376         private static String createSimpleStrategyQuery(Configuration.CassandrConfig.KeyspaceConfig keyspaceInfo, String keyspace) {
377                 String query = null;
378                 List<String> dcList = keyspaceInfo.getReplicationInfo();
379                 if (dcList.size() != 1) {
380                         log.error("the supplied replication info is in valid expected <number> etc received:{}", dcList);
381
382                 } else {
383                         query = String.format(CREATE_KEYSPACE_SIMPLE_STRATEGY, keyspace, "'replication_factor'" + " : " + dcList.get(0));
384                 }
385                 return query;
386         }
387
388
389
390         public enum ReplicationStrategy {
391                 NETWORK_TOPOLOGY_STRATEGY("NetworkTopologyStrategy"), SIMPLE_STRATEGY("SimpleStrategy");
392
393                 private String strategyName;
394
395                 ReplicationStrategy(String strategyName) {
396                         this.strategyName = strategyName;
397                 }
398
399                 public String getStrategyName() {
400                         return strategyName;
401                 }
402         }
403 }